dlokesha commited on
Commit
2883e00
Β·
1 Parent(s): c092a08

feat: txt ingestion pipeline with chunking and embeddings

Browse files
Files changed (4) hide show
  1. app.py +100 -0
  2. backend/ingestion_txt.py +273 -0
  3. db/schema.sql +18 -0
  4. requirements.txt +2 -0
app.py CHANGED
@@ -6,6 +6,7 @@ from dotenv import load_dotenv
6
  load_dotenv(Path(__file__).resolve().parent.parent / ".env")
7
  load_dotenv(Path(__file__).resolve().parent / ".env")
8
 
 
9
  import gradio as gr
10
 
11
  from backend.notebook_service import create_notebook, list_notebooks, rename_notebook, delete_notebook
@@ -147,6 +148,70 @@ def _build_row_updates(notebooks):
147
  out.append(gr.update(value=name, visible=visible))
148
  return out
149
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
150
 
151
  with gr.Blocks(
152
  title="NotebookLM Clone - Notebooks",
@@ -229,4 +294,39 @@ with gr.Blocks(
229
  outputs=[selected_notebook_id],
230
  ).then(_on_select, None, [status])
231
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
232
  demo.launch()
 
6
  load_dotenv(Path(__file__).resolve().parent.parent / ".env")
7
  load_dotenv(Path(__file__).resolve().parent / ".env")
8
 
9
+ from datetime import datetime
10
  import gradio as gr
11
 
12
  from backend.notebook_service import create_notebook, list_notebooks, rename_notebook, delete_notebook
 
148
  out.append(gr.update(value=name, visible=visible))
149
  return out
150
 
151
+ # ── Upload Handler Functions ──────────────────────────────────
152
+ def _do_upload(text_content, title, notebook_id, profile: gr.OAuthProfile | None):
153
+ """Handle direct text input and ingestion."""
154
+ from backend.ingestion_txt import ingest_txt, list_sources
155
+
156
+ user_id = _user_id(profile)
157
+
158
+ if not user_id:
159
+ return "❌ Please sign in first.", ""
160
+ if not notebook_id:
161
+ return "❌ Please select a notebook first.", ""
162
+ if not text_content or not text_content.strip():
163
+ return "❌ No text entered.", ""
164
+
165
+ try:
166
+ # Use title as filename, fallback to timestamp
167
+ filename = (title or "").strip()
168
+ if not filename:
169
+ filename = f"text_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
170
+ if not filename.endswith(".txt"):
171
+ filename = filename + ".txt"
172
+
173
+ # Convert text to bytes for ingestion pipeline
174
+ file_bytes = text_content.encode("utf-8")
175
+
176
+ result = ingest_txt(
177
+ file_bytes=file_bytes,
178
+ filename=filename,
179
+ notebook_id=notebook_id,
180
+ user_id=user_id
181
+ )
182
+
183
+ meta = result["metadata"]
184
+ status_msg = (
185
+ f"βœ… **{result['filename']}** saved successfully!\n\n"
186
+ f"- Size: {meta['size_bytes'] / 1024:.1f} KB"
187
+ )
188
+
189
+ #sources = list_sources(notebook_id)
190
+ return status_msg, ""
191
+
192
+ except ValueError as e:
193
+ return f"❌ {str(e)}", ""
194
+ except Exception as e:
195
+ return f"❌ Unexpected error: {str(e)}", ""
196
+
197
+ def _format_sources(sources: list[dict]) -> str:
198
+ if not sources:
199
+ return "No sources yet."
200
+ lines = ["| Filename | Type | Status | Words |",
201
+ "|----------|------|--------|-------|"]
202
+ for s in sources:
203
+ meta = s.get("metadata") or {}
204
+ words = meta.get("word_count", "β€”")
205
+ lines.append(f"| {s['filename']} | {s['file_type']} | {s['status']} | {words} |")
206
+ return "\n".join(lines)
207
+
208
+
209
+ def _load_sources(notebook_id, profile: gr.OAuthProfile | None):
210
+ from backend.ingestion_txt import list_sources
211
+ if not notebook_id:
212
+ return ""
213
+ sources = list_sources(notebook_id)
214
+ return _format_sources(sources)
215
 
216
  with gr.Blocks(
217
  title="NotebookLM Clone - Notebooks",
 
294
  outputs=[selected_notebook_id],
295
  ).then(_on_select, None, [status])
296
 
297
+ # ── Text Input Section ────────────────────────────────────
298
+ gr.Markdown("---")
299
+ gr.Markdown("## Add Text")
300
+ gr.Markdown("Select a notebook above, then paste or type your text.")
301
+
302
+ with gr.Row():
303
+ txt_title = gr.Textbox(
304
+ label="Title",
305
+ placeholder="Give this text a name (e.g. 'Lecture Notes Week 1')",
306
+ scale=1,
307
+ )
308
+
309
+ txt_input = gr.Textbox(
310
+ label="Text Content",
311
+ placeholder="Paste or type your text here...",
312
+ lines=10,
313
+ )
314
+
315
+ submit_btn = gr.Button("Save & Process", variant="primary")
316
+
317
+ upload_status = gr.Markdown("", elem_classes=["status"])
318
+ sources_display = gr.Markdown("")
319
+
320
+ submit_btn.click(
321
+ _do_upload,
322
+ inputs=[txt_input, txt_title, selected_notebook_id],
323
+ outputs=[upload_status, sources_display],
324
+ )
325
+
326
+ selected_notebook_id.change(
327
+ _load_sources,
328
+ inputs=[selected_notebook_id],
329
+ outputs=[sources_display],
330
+ )
331
+
332
  demo.launch()
backend/ingestion_txt.py ADDED
@@ -0,0 +1,273 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Text file ingestion pipeline.
3
+ Handles .txt upload β†’ extract β†’ clean β†’ save to Supabase DB + Storage.
4
+ """
5
+
6
+ import chardet
7
+ import re
8
+ from datetime import datetime
9
+ from uuid import uuid4
10
+
11
+ from backend.db import supabase
12
+ from backend.storage import save_file, get_sources_path
13
+
14
+ import os
15
+ from sentence_transformers import SentenceTransformer
16
+
17
+ # Load model once at module level (not on every call)
18
+ _model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
19
+ # ── Constants ────────────────────────────────────────────────
20
+
21
+ MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB
22
+
23
+
24
+ # ── Text Processing ──────────────────────────────────────────
25
+
26
+ def detect_encoding(file_bytes: bytes) -> str:
27
+ """
28
+ Detects encoding of raw bytes.
29
+ Falls back to utf-8 if confidence is low.
30
+ """
31
+ result = chardet.detect(file_bytes)
32
+ encoding = result.get("encoding") or "utf-8"
33
+ confidence = result.get("confidence") or 0
34
+
35
+ if confidence < 0.7:
36
+ return "utf-8"
37
+
38
+ return encoding
39
+
40
+
41
+ def clean_text(text: str) -> str:
42
+ """
43
+ Cleans raw extracted text.
44
+ - Removes null bytes
45
+ - Removes control characters (keeps newlines + tabs)
46
+ - Normalizes excessive blank lines
47
+ - Strips leading/trailing whitespace
48
+ """
49
+ # Remove null bytes
50
+ text = text.replace("\x00", "")
51
+
52
+ # Remove control characters except \n and \t
53
+ text = "".join(
54
+ ch for ch in text
55
+ if ch == "\n" or ch == "\t" or ch >= " "
56
+ )
57
+
58
+ # Normalize 3+ blank lines β†’ 2
59
+ text = re.sub(r"\n{3,}", "\n\n", text)
60
+
61
+ return text.strip()
62
+
63
+
64
+ # ── Supabase DB Operations ───────────────────────────────────
65
+
66
+ def _create_source_record(
67
+ source_id: str,
68
+ notebook_id: str,
69
+ user_id: str,
70
+ filename: str,
71
+ storage_path: str
72
+ ) -> None:
73
+ """Insert a new source row with PENDING status."""
74
+ supabase.table("sources").insert({
75
+ "id": source_id,
76
+ "notebook_id": notebook_id,
77
+ "user_id": user_id,
78
+ "filename": filename,
79
+ "file_type": "txt",
80
+ "status": "PENDING",
81
+ "storage_path": storage_path,
82
+ }).execute()
83
+
84
+ # ── Chunking ─────────────────────────────────────────────────
85
+ def chunk_text(text: str, source_id: str, notebook_id: str, filename: str = "") -> list[dict]:
86
+ words = text.split()
87
+ chunk_size = 400
88
+ overlap = 40
89
+ chunks = []
90
+ i = 0
91
+
92
+ # Calculate total chunks upfront
93
+ total_chunks = max(1, (len(words) + chunk_size - overlap - 1) // (chunk_size - overlap))
94
+
95
+ while i < len(words):
96
+ chunk_words = words[i:i + chunk_size]
97
+ content = " ".join(chunk_words)
98
+ chunks.append({
99
+ "id": str(uuid4()),
100
+ "source_id": source_id,
101
+ "notebook_id": notebook_id,
102
+ "content": content,
103
+ "chunk_index": len(chunks),
104
+ "metadata": {
105
+ "word_count": len(chunk_words),
106
+ "file_name": filename,
107
+ "chunk_index": len(chunks),
108
+ "total_chunks": total_chunks,
109
+ }
110
+ })
111
+ i += chunk_size - overlap
112
+
113
+ return chunks
114
+
115
+
116
+ # ── Embed + Store ─────────────────────────────────────────────
117
+ def embed_and_store_chunks(chunks: list[dict]) -> None:
118
+ """
119
+ Embed chunks using sentence-transformers and store in pgvector.
120
+ """
121
+ if not chunks:
122
+ return
123
+
124
+ # Embed all chunks in one batch
125
+ texts = [c["content"] for c in chunks]
126
+ embeddings = _model.encode(texts, show_progress_bar=False)
127
+
128
+ # Build rows for Supabase insert
129
+ rows = []
130
+ for chunk, embedding in zip(chunks, embeddings):
131
+ rows.append({
132
+ "id": str(chunk["id"]),
133
+ "source_id": str(chunk["source_id"]),
134
+ "notebook_id": str(chunk["notebook_id"]),
135
+ "content": chunk["content"],
136
+ "embedding": embedding.tolist(),
137
+ "metadata": chunk["metadata"]
138
+ })
139
+
140
+ try:
141
+ supabase.table("chunks").insert(rows).execute()
142
+ print(f"βœ… Inserted {len(rows)} chunks into pgvector")
143
+ except Exception as e:
144
+ print(f"❌ Failed to insert chunks: {e}")
145
+ raise
146
+
147
+ def _update_source_ready(
148
+ source_id: str,
149
+ extracted_text: str,
150
+ metadata: dict
151
+ ) -> None:
152
+ """Mark source as READY with extracted text and metadata."""
153
+ supabase.table("sources").update({
154
+ "status": "READY",
155
+ "extracted_text": extracted_text,
156
+ "metadata": metadata,
157
+ "updated_at": datetime.utcnow().isoformat(),
158
+ }).eq("id", source_id).execute()
159
+
160
+
161
+ def _update_source_failed(source_id: str, error: str) -> None:
162
+ """Mark source as FAILED with error message in metadata."""
163
+ supabase.table("sources").update({
164
+ "status": "FAILED",
165
+ "metadata": {"error": error},
166
+ "updated_at": datetime.utcnow().isoformat(),
167
+ }).eq("id", source_id).execute()
168
+
169
+
170
+ # ── Main Ingestion Function ──────────────────────────────────
171
+
172
+ def ingest_txt(
173
+ file_bytes: bytes,
174
+ filename: str,
175
+ notebook_id: str,
176
+ user_id: str
177
+ ) -> dict:
178
+ """
179
+ Full pipeline for a .txt file upload:
180
+ 1. Validate size
181
+ 2. Upload raw file to Supabase Storage
182
+ 3. Create source record (PENDING)
183
+ 4. Detect encoding + decode
184
+ 5. Clean text
185
+ 6. Update source record (READY)
186
+ 7. Return result dict
187
+
188
+ Returns dict with source_id, filename, status, metadata.
189
+ Raises ValueError on validation errors.
190
+ """
191
+
192
+ # ── Validate ─────────────────────────────────────────────
193
+ if not file_bytes:
194
+ raise ValueError("Empty file β€” nothing to ingest.")
195
+
196
+ if len(file_bytes) > MAX_FILE_SIZE:
197
+ raise ValueError(f"File too large. Max size is 10MB.")
198
+
199
+ if not filename.lower().endswith(".txt"):
200
+ raise ValueError("Only .txt files are accepted here.")
201
+
202
+ # ── Generate IDs ─────────────────────────────────────────
203
+ source_id = str(uuid4())
204
+
205
+ # ── Upload raw file to Supabase Storage ──────────────────
206
+ sources_path = get_sources_path(user_id, notebook_id)
207
+ storage_path = f"{sources_path}/{source_id}_{filename}"
208
+
209
+ save_file(storage_path, file_bytes)
210
+
211
+ # ── Create DB record (PENDING) ───────────────────────────
212
+ _create_source_record(
213
+ source_id=source_id,
214
+ notebook_id=notebook_id,
215
+ user_id=user_id,
216
+ filename=filename,
217
+ storage_path=storage_path
218
+ )
219
+
220
+ # ── Extract + Clean ───────────────────────────────────────
221
+ try:
222
+ encoding = detect_encoding(file_bytes)
223
+ raw_text = file_bytes.decode(encoding, errors="replace")
224
+ cleaned_text = clean_text(raw_text)
225
+
226
+ if not cleaned_text:
227
+ raise ValueError("No text content found after cleaning.")
228
+
229
+ metadata = {
230
+ "encoding": encoding,
231
+ "char_count": len(cleaned_text),
232
+ "word_count": len(cleaned_text.split()),
233
+ "line_count": cleaned_text.count("\n") + 1,
234
+ "size_bytes": len(file_bytes),
235
+ }
236
+
237
+ # ── Update DB record (READY) ──────────────────────────
238
+ _update_source_ready(source_id, cleaned_text, metadata)
239
+
240
+ # ── Chunk + Embed + Store ─────────────────────────────
241
+ print(f"πŸ”„ Starting chunking for {filename}...")
242
+ chunks = chunk_text(cleaned_text, source_id, notebook_id, filename=filename)
243
+ print(f"πŸ”„ Created {len(chunks)} chunks, embedding now...")
244
+ embed_and_store_chunks(chunks)
245
+
246
+ return {
247
+ "source_id": source_id,
248
+ "filename": filename,
249
+ "status": "READY",
250
+ "metadata": metadata,
251
+ "extracted_text": cleaned_text,
252
+ "chunks_created": len(chunks),
253
+ }
254
+
255
+ except Exception as e:
256
+ print(f"❌ Ingestion failed: {e}")
257
+ _update_source_failed(source_id, str(e))
258
+ raise
259
+
260
+
261
+ # ── List Sources for a Notebook ──────────────────────────────
262
+
263
+ def list_sources(notebook_id: str) -> list[dict]:
264
+ """
265
+ Returns all sources for a notebook ordered by created_at.
266
+ """
267
+ result = supabase.table("sources")\
268
+ .select("id, filename, file_type, status, metadata, created_at")\
269
+ .eq("notebook_id", notebook_id)\
270
+ .order("created_at")\
271
+ .execute()
272
+
273
+ return result.data or []
db/schema.sql CHANGED
@@ -46,3 +46,21 @@ create table if not exists chunks (
46
  create index if not exists idx_chunks_notebook_id on chunks(notebook_id);
47
  -- Vector index (run after you have data; ivfflat requires rows):
48
  -- create index idx_chunks_embedding on chunks using ivfflat (embedding vector_cosine_ops) with (lists = 100);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
46
  create index if not exists idx_chunks_notebook_id on chunks(notebook_id);
47
  -- Vector index (run after you have data; ivfflat requires rows):
48
  -- create index idx_chunks_embedding on chunks using ivfflat (embedding vector_cosine_ops) with (lists = 100);
49
+
50
+ -- sources table (ingestion pipeline)
51
+ create table if not exists sources (
52
+ id uuid primary key default gen_random_uuid(),
53
+ notebook_id uuid not null references notebooks(id) on delete cascade,
54
+ user_id text not null,
55
+ filename text not null,
56
+ file_type text not null,
57
+ status text not null default 'PENDING',
58
+ storage_path text,
59
+ extracted_text text,
60
+ metadata jsonb default '{}',
61
+ created_at timestamptz default now(),
62
+ updated_at timestamptz default now()
63
+ );
64
+ create index if not exists idx_sources_notebook_id on sources(notebook_id);
65
+ create index if not exists idx_sources_user_id on sources(user_id);
66
+ create index if not exists idx_sources_status on sources(status);
requirements.txt CHANGED
@@ -3,3 +3,5 @@ huggingface_hub==0.24.7
3
  supabase>=2.0.0
4
  python-dotenv>=1.0.0
5
  realtime==2.3.0
 
 
 
3
  supabase>=2.0.0
4
  python-dotenv>=1.0.0
5
  realtime==2.3.0
6
+ chardet>=5.0.0
7
+ sentence-transformers>=2.0.0