IntegraChat / backend /workers /ingestion_worker.py
nothingworry's picture
feat: add knowledge base management and analytics dashboard
aa63765
"""
backend/workers/ingestion_worker.py
Celery task(s) for document ingestion:
- extract_text_from_file: supports PDF, DOCX, TXT, or raw text
- chunk_text: simple sentence-based chunker with overlap
- embed_chunks: uses Sentence-Transformers (all-MiniLM-L6-v2) if available,
otherwise uses a fallback hash-based vector (deterministic) for dev
- write_embeddings_to_supabase: stores chunk metadata and embedding into Supabase/pgvector
- ingest_document task: orchestrates the end-to-end flow
Notes:
- Expects SUPABASE_URL and SUPABASE_SERVICE_KEY in environment for production.
- Uses CELERY broker settings from celeryconfig.py
"""
import os
import math
import time
import logging
from typing import List, Dict, Optional
from celery import shared_task
from pathlib import Path
# Try to import sentence-transformers; fallback to hashlib if not present
try:
from sentence_transformers import SentenceTransformer
EMBED_MODEL = SentenceTransformer("all-MiniLM-L6-v2")
except Exception:
EMBED_MODEL = None
# Try to import supabase client. If missing, fallback to using psycopg2/pgconnection if env provided.
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_SERVICE_KEY = os.getenv("SUPABASE_SERVICE_KEY")
try:
from supabase import create_client, Client as SupabaseClient
if SUPABASE_URL and SUPABASE_SERVICE_KEY:
SUPABASE = create_client(SUPABASE_URL, SUPABASE_SERVICE_KEY)
else:
SUPABASE = None
except Exception:
SUPABASE = None
# Try to import a robust text extractor lib. Use python-docx + plain open as fallback.
try:
import textract # optional powerful extractor
except Exception:
textract = None
import hashlib
import json
import re
logger = logging.getLogger("ingestion_worker")
logger.setLevel(os.getenv("LOG_LEVEL", "INFO"))
# -----------------------
# Utilities
# -----------------------
def extract_text_from_path(path: str) -> str:
"""
Extract text from a given file path.
Supports: .txt, .md, .pdf (via textract if available), .docx (basic fallback)
"""
p = Path(path)
if not p.exists():
raise FileNotFoundError(f"File not found: {path}")
suffix = p.suffix.lower()
if suffix in [".txt", ".md"]:
return p.read_text(encoding="utf-8", errors="ignore")
if textract is not None:
try:
raw = textract.process(str(p))
return raw.decode("utf-8", errors="ignore")
except Exception as e:
logger.warning("textract failed, falling back to basic read (%s)", e)
# basic docx fallback
if suffix == ".docx":
try:
from docx import Document as DocxDocument
doc = DocxDocument(str(p))
return "\n".join(par.text for par in doc.paragraphs)
except Exception:
logger.exception("docx extraction failed; returning empty text")
return ""
# last fallback: binary read as text
try:
return p.read_text(encoding="utf-8", errors="ignore")
except Exception:
logger.exception("unknown file type and text read failed")
return ""
def simple_chunk_text(text: str, chunk_size: int = 800, chunk_overlap: int = 100) -> List[str]:
"""
Very simple chunker that splits on sentences up to roughly chunk_size tokens/characters.
chunk_size is approximate (characters). Overlap is characters overlapped between chunks.
"""
clean = re.sub(r"\s+", " ", text).strip()
if not clean:
return []
chunks = []
start = 0
n = len(clean)
while start < n:
end = min(n, start + chunk_size)
# try to expand to sentence boundary
if end < n:
m = clean.rfind(".", start, end)
if m != -1 and m - start > chunk_size // 3:
end = m + 1
chunk = clean[start:end].strip()
if chunk:
chunks.append(chunk)
start = max(end - chunk_overlap, end)
return chunks
def embed_texts(texts: List[str]) -> List[List[float]]:
"""
Use SentenceTransformer model if available; otherwise use a deterministic hash-based fallback vector.
The fallback returns a small vector (e.g. 64-d) computed from sha256 chunks — only for local dev/testing.
"""
if EMBED_MODEL is not None:
vectors = EMBED_MODEL.encode(texts, show_progress_bar=False).tolist()
return vectors
# fallback
vecs = []
for t in texts:
h = hashlib.sha256(t.encode("utf-8")).digest()
# convert to floats in range [-1,1]
vals = []
for i in range(0, min(len(h), 64)):
vals.append(((h[i] / 255.0) * 2.0) - 1.0)
# pad to 64
while len(vals) < 64:
vals.append(0.0)
vecs.append(vals)
return vecs
def upsert_embeddings_supabase(tenant_id: str, doc_id: str, chunks: List[Dict]):
"""
Upsert chunk records into Supabase table `embeddings` (or 'rag_embeddings').
Expected schema (example):
- id (uuid)
- tenant_id (text)
- doc_id (text)
- chunk_index (int)
- chunk_text (text)
- metadata (jsonb)
- embedding (vector) -> in pgvector column
- created_at (timestamp)
This function attempts to use supabase-py. If not available, logs the JSON for manual insertion.
"""
if SUPABASE is None:
logger.warning("Supabase client not configured. Logging chunks for manual insertion.")
logger.debug("Chunks sample: %s", json.dumps(chunks[:3], indent=2))
return {"status": "logged", "count": len(chunks)}
table_name = os.getenv("SUPABASE_EMBED_TABLE", "rag_embeddings")
# Build rows
rows = []
ts = int(time.time())
for c in chunks:
rows.append({
"tenant_id": tenant_id,
"doc_id": doc_id,
"chunk_index": c.get("index"),
"chunk_text": c.get("text"),
"metadata": c.get("metadata", {}),
"embedding": c.get("embedding"),
"created_at": time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(ts)),
})
# Use upsert
try:
res = SUPABASE.table(table_name).upsert(rows).execute()
logger.info("Upserted %d embeddings to Supabase table %s", len(rows), table_name)
return {"status": "ok", "count": len(rows), "result": res}
except Exception as e:
logger.exception("Failed to upsert embeddings to Supabase: %s", e)
return {"status": "error", "error": str(e)}
# -----------------------
# Celery task(s)
# -----------------------
# We import app lazily to avoid circular imports — celery app is in celeryconfig.py
try:
from backend.workers.celeryconfig import celery_app
except Exception:
# If import fails, create a simple local Celery-like decorator using dummy shared_task
celery_app = None
def task_decorator(func):
if celery_app is not None:
return celery_app.task(func)
else:
# no-op: run synchronously for dev/testing
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
@task_decorator
def ingest_document(tenant_id: str, doc_id: str,
file_path: Optional[str] = None,
raw_text: Optional[str] = None,
source_url: Optional[str] = None,
chunk_size: int = 800,
chunk_overlap: int = 100):
"""
End-to-end ingestion task.
- if raw_text provided, use it
- otherwise extract from file_path
- chunk, embed, and upsert into Supabase
Returns a dict with status and counts.
"""
start_ts = time.time()
logger.info("ingest_document started: tenant=%s doc_id=%s", tenant_id, doc_id)
try:
if raw_text:
text = raw_text
elif file_path:
text = extract_text_from_path(file_path)
elif source_url:
# basic fetch for simple pages
import requests
r = requests.get(source_url, timeout=10)
r.raise_for_status()
text = re.sub(r"\s+", " ", r.text)
else:
raise ValueError("Either raw_text, file_path, or source_url must be provided.")
if not text or text.strip() == "":
logger.warning("No text extracted for doc_id=%s", doc_id)
return {"status": "empty", "chunks": 0}
chunks_text = simple_chunk_text(text, chunk_size=chunk_size, chunk_overlap=chunk_overlap)
logger.info("Extracted %d chunks for doc=%s", len(chunks_text), doc_id)
# Prepare chunk metadata
chunks = []
for i, t in enumerate(chunks_text):
chunks.append({"index": i, "text": t, "metadata": {"source_url": source_url}})
# embed in batches
batch_size = 32
embeddings = []
for i in range(0, len(chunks), batch_size):
batch_texts = [c["text"] for c in chunks[i:i+batch_size]]
batch_emb = embed_texts(batch_texts)
embeddings.extend(batch_emb)
# attach embeddings
for i, c in enumerate(chunks):
c["embedding"] = embeddings[i]
# upsert into supabase
res = upsert_embeddings_supabase(tenant_id, doc_id, chunks)
elapsed = time.time() - start_ts
logger.info("ingest_document finished: tenant=%s doc=%s chunks=%d elapsed=%.2fs",
tenant_id, doc_id, len(chunks), elapsed)
return {"status": "ok", "chunks": len(chunks), "supabase": res}
except Exception as exc:
logger.exception("ingest_document failed: %s", exc)
return {"status": "error", "error": str(exc)}