bichnhan2701 commited on
Commit
7d65af7
·
1 Parent(s): 0e732c5

Update server logic

Browse files
app/api/notes/notes_audio.py CHANGED
@@ -2,7 +2,7 @@ from fastapi import APIRouter, BackgroundTasks, HTTPException
2
  from pydantic import BaseModel
3
  from typing import Dict, List, Optional
4
  from app.services.note_store import create_note
5
- from app.jobs.enrichment_job import enrich_note
6
  from app.models.enums import NoteType, NoteStatus
7
  from app.utils.time import now_ts
8
 
@@ -18,10 +18,14 @@ class CreateAudioNoteRequest(BaseModel):
18
 
19
 
20
  @router.post("/audio")
21
- async def create_audio_note(req: CreateAudioNoteRequest, bg: BackgroundTasks):
22
  now = now_ts()
23
  has_enrichment = bool(req.generate)
24
 
 
 
 
 
25
  note = {
26
  "note_id": req.note_id,
27
  "type": NoteType.audio,
@@ -32,6 +36,15 @@ async def create_audio_note(req: CreateAudioNoteRequest, bg: BackgroundTasks):
32
  "updated_at": now,
33
  }
34
 
 
 
 
 
 
 
 
 
 
35
  create_note(note)
36
 
37
  if has_enrichment:
 
2
  from pydantic import BaseModel
3
  from typing import Dict, List, Optional
4
  from app.services.note_store import create_note
5
+ from app.jobs.async_enrichment_job import enrich_note
6
  from app.models.enums import NoteType, NoteStatus
7
  from app.utils.time import now_ts
8
 
 
18
 
19
 
20
  @router.post("/audio")
21
+ async def create_audio_note(req: CreateAudioNoteRequest, bg: BackgroundTasks):
22
  now = now_ts()
23
  has_enrichment = bool(req.generate)
24
 
25
+ allowed = {"normalize", "keywords", "summary", "mindmap"}
26
+ if not set(req.generate).issubset(allowed):
27
+ raise HTTPException(400, "Invalid generate task, generate list must not be empty")
28
+
29
  note = {
30
  "note_id": req.note_id,
31
  "type": NoteType.audio,
 
36
  "updated_at": now,
37
  }
38
 
39
+ if has_enrichment:
40
+ note.update({
41
+ "normalized_text": None,
42
+ "title": None,
43
+ "keywords": None,
44
+ "summary": None,
45
+ "mindmap": None,
46
+ })
47
+
48
  create_note(note)
49
 
50
  if has_enrichment:
app/api/notes/notes_events.py ADDED
@@ -0,0 +1,85 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import json
3
+ import logging
4
+ from fastapi import APIRouter
5
+ from sse_starlette.sse import EventSourceResponse
6
+
7
+ from app.services.note_store import get_note
8
+ from app.models.enums import NoteStatus
9
+
10
+ logger = logging.getLogger(__name__)
11
+
12
+ router = APIRouter()
13
+
14
+ @router.get("/notes/{note_id}/events")
15
+ async def note_events(note_id: str):
16
+ """
17
+ Server-Sent Events endpoint:
18
+ - Push note.status whenever it changes
19
+ - Close stream when status == ready or error
20
+ """
21
+
22
+ async def event_generator():
23
+ last_status = None
24
+
25
+ try:
26
+ while True:
27
+ note = get_note(note_id)
28
+
29
+ if not note:
30
+ yield {
31
+ "event": "error",
32
+ "data": json.dumps({
33
+ "note_id": note_id,
34
+ "status": "not_found",
35
+ }),
36
+ }
37
+ break
38
+
39
+ status = note.get("status")
40
+
41
+ # Push only when status changes
42
+ if status != last_status:
43
+ yield {
44
+ "event": "status",
45
+ "data": json.dumps({
46
+ "note_id": note_id,
47
+ "status": status,
48
+ }),
49
+ }
50
+ last_status = status
51
+
52
+ # Terminal states → close SSE
53
+ if status in (
54
+ NoteStatus.ready,
55
+ NoteStatus.error,
56
+ ):
57
+ break
58
+
59
+ # Avoid hammering Firestore
60
+ await asyncio.sleep(1.0)
61
+
62
+ except asyncio.CancelledError:
63
+ # Client disconnected (normal)
64
+ logger.info(
65
+ "[notes_events] SSE client disconnected note_id=%s",
66
+ note_id,
67
+ )
68
+ except Exception as e:
69
+ logger.exception(
70
+ "[notes_events] SSE error note_id=%s: %s",
71
+ note_id,
72
+ e,
73
+ )
74
+ yield {
75
+ "event": "error",
76
+ "data": json.dumps({
77
+ "note_id": note_id,
78
+ "status": "internal_error",
79
+ }),
80
+ }
81
+
82
+ return EventSourceResponse(
83
+ event_generator(),
84
+ ping=15, # keep-alive, tránh proxy/nginx kill connection
85
+ )
app/api/notes/notes_regenerate.py CHANGED
@@ -4,12 +4,11 @@ from fastapi import APIRouter, BackgroundTasks, HTTPException
4
  from pydantic import BaseModel
5
  from typing import List
6
  from app.services.note_store import get_note, update_note
7
- from app.jobs.enrichment_job import enrich_note
8
 
9
 
10
  router = APIRouter(prefix="/notes", tags=["notes"])
11
 
12
-
13
  class RegenerateRequest(BaseModel):
14
  generate: List[str]
15
 
@@ -19,10 +18,19 @@ def regenerate_note(note_id: str, req: RegenerateRequest, bg: BackgroundTasks):
19
  note = get_note(note_id)
20
  if not note:
21
  raise HTTPException(404, "Note not found")
 
 
 
 
22
 
23
  # mark processing immediately
24
  update_note(note_id, {
25
  "status": NoteStatus.processing,
 
 
 
 
 
26
  "updated_at": now_ts(),
27
  })
28
 
 
4
  from pydantic import BaseModel
5
  from typing import List
6
  from app.services.note_store import get_note, update_note
7
+ from app.jobs.async_enrichment_job import enrich_note
8
 
9
 
10
  router = APIRouter(prefix="/notes", tags=["notes"])
11
 
 
12
  class RegenerateRequest(BaseModel):
13
  generate: List[str]
14
 
 
18
  note = get_note(note_id)
19
  if not note:
20
  raise HTTPException(404, "Note not found")
21
+
22
+ allowed = {"normalize", "keywords", "summary", "mindmap"}
23
+ if not set(req.generate).issubset(allowed):
24
+ raise HTTPException(400, "Invalid generate task, generate list must not be empty")
25
 
26
  # mark processing immediately
27
  update_note(note_id, {
28
  "status": NoteStatus.processing,
29
+ "normalized_text": None,
30
+ "title": None,
31
+ "keywords": None,
32
+ "summary": None,
33
+ "mindmap": None,
34
  "updated_at": now_ts(),
35
  })
36
 
app/api/notes/notes_text.py CHANGED
@@ -2,7 +2,7 @@ from fastapi import APIRouter, BackgroundTasks
2
  from pydantic import BaseModel
3
  from typing import List, Optional
4
  from app.services.note_store import create_note
5
- from app.jobs.enrichment_job import enrich_note
6
  from app.models.enums import NoteType, NoteStatus
7
  from app.utils.id import new_id
8
  from app.utils.time import now_ts
 
2
  from pydantic import BaseModel
3
  from typing import List, Optional
4
  from app.services.note_store import create_note
5
+ from app.jobs.async_enrichment_job import enrich_note
6
  from app.models.enums import NoteType, NoteStatus
7
  from app.utils.id import new_id
8
  from app.utils.time import now_ts
app/jobs/async_enrichment_job.py ADDED
@@ -0,0 +1,74 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ from app.services.enrichment.mindmap import generate_mindmap
3
+ from app.services.enrichment.normalize import normalize_text
4
+ from app.services.enrichment.summary import generate_summary
5
+ from app.services.enrichment.title_keywords import extract_title_and_keywords
6
+ from app.services.note_store import get_note, update_note
7
+ from app.services.enrichment.batch_pipeline import run_pipeline
8
+ from app.models.enums import NoteStatus
9
+ from app.utils.time import now_ts
10
+
11
+ logger = logging.getLogger(__name__)
12
+
13
+
14
+ async def enrich_note(note_id: str, tasks: list[str]):
15
+ note = get_note(note_id)
16
+ if not note:
17
+ return
18
+
19
+ update_note(note_id, {
20
+ "status": NoteStatus.processing,
21
+ "updated_at": now_ts(),
22
+ })
23
+
24
+ try:
25
+ # 1️⃣ Normalize
26
+ if "normalize" in tasks:
27
+ text = await normalize_text(note.get("raw_text", ""))
28
+ update_note(note_id, {
29
+ "normalized_text": text,
30
+ "status": NoteStatus.normalize_done,
31
+ "updated_at": now_ts(),
32
+ })
33
+ else:
34
+ text = note.get("raw_text", "")
35
+
36
+ # 2️⃣ Title + Keywords
37
+ if "keywords" in tasks:
38
+ title, keywords = await extract_title_and_keywords(text)
39
+ update_note(note_id, {
40
+ "title": title,
41
+ "keywords": keywords,
42
+ "status": NoteStatus.keywords_done,
43
+ "updated_at": now_ts(),
44
+ })
45
+
46
+ # 3️⃣ Summary
47
+ if "summary" in tasks:
48
+ summary = await generate_summary(text)
49
+ update_note(note_id, {
50
+ "summary": summary,
51
+ "status": NoteStatus.summary_done,
52
+ "updated_at": now_ts(),
53
+ })
54
+
55
+ # 4️⃣ Mindmap
56
+ if "mindmap" in tasks:
57
+ mindmap = await generate_mindmap(text)
58
+ update_note(note_id, {
59
+ "mindmap": mindmap,
60
+ "status": NoteStatus.mindmap_done,
61
+ "updated_at": now_ts(),
62
+ })
63
+
64
+ update_note(note_id, {
65
+ "status": NoteStatus.ready,
66
+ "updated_at": now_ts(),
67
+ })
68
+
69
+ except Exception:
70
+ logger.exception("Enrichment failed note_id=%s", note_id)
71
+ update_note(note_id, {
72
+ "status": NoteStatus.error,
73
+ "updated_at": now_ts(),
74
+ })
app/jobs/enrichment_job.py DELETED
@@ -1,54 +0,0 @@
1
- import logging
2
- from app.services.note_store import get_note, update_note
3
- from app.services.enrichment.pipeline import run_pipeline
4
- from app.models.enums import NoteStatus
5
- from app.utils.time import now_ts
6
-
7
- logger = logging.getLogger(__name__)
8
-
9
-
10
- async def enrich_note(note_id: str, tasks: list[str]):
11
- note = get_note(note_id)
12
- if not note:
13
- return
14
-
15
- # Mark as processing
16
- update_note(
17
- note_id,
18
- {
19
- "status": NoteStatus.processing,
20
- "updated_at": now_ts(),
21
- },
22
- )
23
-
24
- try:
25
- # Run NLP pipeline (mutates a copy of note)
26
- enriched = await run_pipeline(note, tasks)
27
-
28
- updates = {}
29
-
30
- # Only persist known enrichment fields
31
- for field in (
32
- "title",
33
- "normalized_text",
34
- "keywords",
35
- "summary",
36
- "mindmap",
37
- ):
38
- if field in enriched:
39
- updates[field] = enriched[field]
40
-
41
- updates["status"] = NoteStatus.ready
42
- updates["updated_at"] = now_ts()
43
-
44
- update_note(note_id, updates)
45
-
46
- except Exception:
47
- logger.exception("Enrichment failed note_id=%s", note_id)
48
- update_note(
49
- note_id,
50
- {
51
- "status": NoteStatus.error,
52
- "updated_at": now_ts(),
53
- },
54
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
app/main.py CHANGED
@@ -1,5 +1,5 @@
1
  from fastapi import FastAPI
2
- from app.api.notes import notes_text, notes_audio, notes_get, notes_update, notes_regenerate
3
  from app.api.folders import folders_create, folders_get, folders_update, folders_delete
4
 
5
  app = FastAPI(title="Note Service API")
@@ -10,6 +10,7 @@ app.include_router(notes_audio.router)
10
  app.include_router(notes_get.router)
11
  app.include_router(notes_update.router)
12
  app.include_router(notes_regenerate.router)
 
13
 
14
  # Folders
15
  app.include_router(folders_create.router)
 
1
  from fastapi import FastAPI
2
+ from app.api.notes import notes_text, notes_audio, notes_get, notes_update, notes_regenerate, notes_events
3
  from app.api.folders import folders_create, folders_get, folders_update, folders_delete
4
 
5
  app = FastAPI(title="Note Service API")
 
10
  app.include_router(notes_get.router)
11
  app.include_router(notes_update.router)
12
  app.include_router(notes_regenerate.router)
13
+ app.include_router(notes_events.router)
14
 
15
  # Folders
16
  app.include_router(folders_create.router)
app/models/enums.py CHANGED
@@ -8,4 +8,8 @@ class NoteStatus(str, Enum):
8
  created = "created"
9
  processing = "processing"
10
  ready = "ready"
11
- error = "error"
 
 
 
 
 
8
  created = "created"
9
  processing = "processing"
10
  ready = "ready"
11
+ error = "error"
12
+ normalize_done = "normalize_done"
13
+ keywords_done = "keywords_done"
14
+ summary_done = "summary_done"
15
+ mindmap_done = "mindmap_done"
app/services/enrichment/{pipeline.py → batch_pipeline.py} RENAMED
@@ -2,7 +2,7 @@ from app.services.enrichment.normalize import normalize_text
2
  from app.services.enrichment.title_keywords import extract_title_and_keywords
3
  from app.services.enrichment.summary import generate_summary
4
  from app.services.enrichment.mindmap import generate_mindmap
5
-
6
  async def run_pipeline(note: dict, tasks: list[str]):
7
  raw_text = note.get("raw_text") or ""
8
  if not raw_text.strip():
 
2
  from app.services.enrichment.title_keywords import extract_title_and_keywords
3
  from app.services.enrichment.summary import generate_summary
4
  from app.services.enrichment.mindmap import generate_mindmap
5
+ # dùng cho regenerate / batch
6
  async def run_pipeline(note: dict, tasks: list[str]):
7
  raw_text = note.get("raw_text") or ""
8
  if not raw_text.strip():
requirements.txt CHANGED
@@ -3,3 +3,4 @@ uvicorn
3
  google-genai
4
  firebase-admin
5
  pydantic
 
 
3
  google-genai
4
  firebase-admin
5
  pydantic
6
+ sse-starlette