""" backend/workers/ingestion_worker.py Celery task(s) for document ingestion: - extract_text_from_file: supports PDF, DOCX, TXT, or raw text - chunk_text: simple sentence-based chunker with overlap - embed_chunks: uses Sentence-Transformers (all-MiniLM-L6-v2) if available, otherwise uses a fallback hash-based vector (deterministic) for dev - write_embeddings_to_supabase: stores chunk metadata and embedding into Supabase/pgvector - ingest_document task: orchestrates the end-to-end flow Notes: - Expects SUPABASE_URL and SUPABASE_SERVICE_KEY in environment for production. - Uses CELERY broker settings from celeryconfig.py """ import os import math import time import logging from typing import List, Dict, Optional from celery import shared_task from pathlib import Path # Try to import sentence-transformers; fallback to hashlib if not present try: from sentence_transformers import SentenceTransformer EMBED_MODEL = SentenceTransformer("all-MiniLM-L6-v2") except Exception: EMBED_MODEL = None # Try to import supabase client. If missing, fallback to using psycopg2/pgconnection if env provided. SUPABASE_URL = os.getenv("SUPABASE_URL") SUPABASE_SERVICE_KEY = os.getenv("SUPABASE_SERVICE_KEY") try: from supabase import create_client, Client as SupabaseClient if SUPABASE_URL and SUPABASE_SERVICE_KEY: SUPABASE = create_client(SUPABASE_URL, SUPABASE_SERVICE_KEY) else: SUPABASE = None except Exception: SUPABASE = None # Try to import a robust text extractor lib. Use python-docx + plain open as fallback. try: import textract # optional powerful extractor except Exception: textract = None import hashlib import json import re logger = logging.getLogger("ingestion_worker") logger.setLevel(os.getenv("LOG_LEVEL", "INFO")) # ----------------------- # Utilities # ----------------------- def extract_text_from_path(path: str) -> str: """ Extract text from a given file path. Supports: .txt, .md, .pdf (via textract if available), .docx (basic fallback) """ p = Path(path) if not p.exists(): raise FileNotFoundError(f"File not found: {path}") suffix = p.suffix.lower() if suffix in [".txt", ".md"]: return p.read_text(encoding="utf-8", errors="ignore") if textract is not None: try: raw = textract.process(str(p)) return raw.decode("utf-8", errors="ignore") except Exception as e: logger.warning("textract failed, falling back to basic read (%s)", e) # basic docx fallback if suffix == ".docx": try: from docx import Document as DocxDocument doc = DocxDocument(str(p)) return "\n".join(par.text for par in doc.paragraphs) except Exception: logger.exception("docx extraction failed; returning empty text") return "" # last fallback: binary read as text try: return p.read_text(encoding="utf-8", errors="ignore") except Exception: logger.exception("unknown file type and text read failed") return "" def simple_chunk_text(text: str, chunk_size: int = 800, chunk_overlap: int = 100) -> List[str]: """ Very simple chunker that splits on sentences up to roughly chunk_size tokens/characters. chunk_size is approximate (characters). Overlap is characters overlapped between chunks. """ clean = re.sub(r"\s+", " ", text).strip() if not clean: return [] chunks = [] start = 0 n = len(clean) while start < n: end = min(n, start + chunk_size) # try to expand to sentence boundary if end < n: m = clean.rfind(".", start, end) if m != -1 and m - start > chunk_size // 3: end = m + 1 chunk = clean[start:end].strip() if chunk: chunks.append(chunk) start = max(end - chunk_overlap, end) return chunks def embed_texts(texts: List[str]) -> List[List[float]]: """ Use SentenceTransformer model if available; otherwise use a deterministic hash-based fallback vector. The fallback returns a small vector (e.g. 64-d) computed from sha256 chunks — only for local dev/testing. """ if EMBED_MODEL is not None: vectors = EMBED_MODEL.encode(texts, show_progress_bar=False).tolist() return vectors # fallback vecs = [] for t in texts: h = hashlib.sha256(t.encode("utf-8")).digest() # convert to floats in range [-1,1] vals = [] for i in range(0, min(len(h), 64)): vals.append(((h[i] / 255.0) * 2.0) - 1.0) # pad to 64 while len(vals) < 64: vals.append(0.0) vecs.append(vals) return vecs def upsert_embeddings_supabase(tenant_id: str, doc_id: str, chunks: List[Dict]): """ Upsert chunk records into Supabase table `embeddings` (or 'rag_embeddings'). Expected schema (example): - id (uuid) - tenant_id (text) - doc_id (text) - chunk_index (int) - chunk_text (text) - metadata (jsonb) - embedding (vector) -> in pgvector column - created_at (timestamp) This function attempts to use supabase-py. If not available, logs the JSON for manual insertion. """ if SUPABASE is None: logger.warning("Supabase client not configured. Logging chunks for manual insertion.") logger.debug("Chunks sample: %s", json.dumps(chunks[:3], indent=2)) return {"status": "logged", "count": len(chunks)} table_name = os.getenv("SUPABASE_EMBED_TABLE", "rag_embeddings") # Build rows rows = [] ts = int(time.time()) for c in chunks: rows.append({ "tenant_id": tenant_id, "doc_id": doc_id, "chunk_index": c.get("index"), "chunk_text": c.get("text"), "metadata": c.get("metadata", {}), "embedding": c.get("embedding"), "created_at": time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(ts)), }) # Use upsert try: res = SUPABASE.table(table_name).upsert(rows).execute() logger.info("Upserted %d embeddings to Supabase table %s", len(rows), table_name) return {"status": "ok", "count": len(rows), "result": res} except Exception as e: logger.exception("Failed to upsert embeddings to Supabase: %s", e) return {"status": "error", "error": str(e)} # ----------------------- # Celery task(s) # ----------------------- # We import app lazily to avoid circular imports — celery app is in celeryconfig.py try: from backend.workers.celeryconfig import celery_app except Exception: # If import fails, create a simple local Celery-like decorator using dummy shared_task celery_app = None def task_decorator(func): if celery_app is not None: return celery_app.task(func) else: # no-op: run synchronously for dev/testing def wrapper(*args, **kwargs): return func(*args, **kwargs) return wrapper @task_decorator def ingest_document(tenant_id: str, doc_id: str, file_path: Optional[str] = None, raw_text: Optional[str] = None, source_url: Optional[str] = None, chunk_size: int = 800, chunk_overlap: int = 100): """ End-to-end ingestion task. - if raw_text provided, use it - otherwise extract from file_path - chunk, embed, and upsert into Supabase Returns a dict with status and counts. """ start_ts = time.time() logger.info("ingest_document started: tenant=%s doc_id=%s", tenant_id, doc_id) try: if raw_text: text = raw_text elif file_path: text = extract_text_from_path(file_path) elif source_url: # basic fetch for simple pages import requests r = requests.get(source_url, timeout=10) r.raise_for_status() text = re.sub(r"\s+", " ", r.text) else: raise ValueError("Either raw_text, file_path, or source_url must be provided.") if not text or text.strip() == "": logger.warning("No text extracted for doc_id=%s", doc_id) return {"status": "empty", "chunks": 0} chunks_text = simple_chunk_text(text, chunk_size=chunk_size, chunk_overlap=chunk_overlap) logger.info("Extracted %d chunks for doc=%s", len(chunks_text), doc_id) # Prepare chunk metadata chunks = [] for i, t in enumerate(chunks_text): chunks.append({"index": i, "text": t, "metadata": {"source_url": source_url}}) # embed in batches batch_size = 32 embeddings = [] for i in range(0, len(chunks), batch_size): batch_texts = [c["text"] for c in chunks[i:i+batch_size]] batch_emb = embed_texts(batch_texts) embeddings.extend(batch_emb) # attach embeddings for i, c in enumerate(chunks): c["embedding"] = embeddings[i] # upsert into supabase res = upsert_embeddings_supabase(tenant_id, doc_id, chunks) elapsed = time.time() - start_ts logger.info("ingest_document finished: tenant=%s doc=%s chunks=%d elapsed=%.2fs", tenant_id, doc_id, len(chunks), elapsed) return {"status": "ok", "chunks": len(chunks), "supabase": res} except Exception as exc: logger.exception("ingest_document failed: %s", exc) return {"status": "error", "error": str(exc)}