Spaces:
Sleeping
Sleeping
| """ | |
| backend/workers/ingestion_worker.py | |
| Celery task(s) for document ingestion: | |
| - extract_text_from_file: supports PDF, DOCX, TXT, or raw text | |
| - chunk_text: simple sentence-based chunker with overlap | |
| - embed_chunks: uses Sentence-Transformers (all-MiniLM-L6-v2) if available, | |
| otherwise uses a fallback hash-based vector (deterministic) for dev | |
| - write_embeddings_to_supabase: stores chunk metadata and embedding into Supabase/pgvector | |
| - ingest_document task: orchestrates the end-to-end flow | |
| Notes: | |
| - Expects SUPABASE_URL and SUPABASE_SERVICE_KEY in environment for production. | |
| - Uses CELERY broker settings from celeryconfig.py | |
| """ | |
| import os | |
| import math | |
| import time | |
| import logging | |
| from typing import List, Dict, Optional | |
| from celery import shared_task | |
| from pathlib import Path | |
| # Try to import sentence-transformers; fallback to hashlib if not present | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| EMBED_MODEL = SentenceTransformer("all-MiniLM-L6-v2") | |
| except Exception: | |
| EMBED_MODEL = None | |
| # Try to import supabase client. If missing, fallback to using psycopg2/pgconnection if env provided. | |
| SUPABASE_URL = os.getenv("SUPABASE_URL") | |
| SUPABASE_SERVICE_KEY = os.getenv("SUPABASE_SERVICE_KEY") | |
| try: | |
| from supabase import create_client, Client as SupabaseClient | |
| if SUPABASE_URL and SUPABASE_SERVICE_KEY: | |
| SUPABASE = create_client(SUPABASE_URL, SUPABASE_SERVICE_KEY) | |
| else: | |
| SUPABASE = None | |
| except Exception: | |
| SUPABASE = None | |
| # Try to import a robust text extractor lib. Use python-docx + plain open as fallback. | |
| try: | |
| import textract # optional powerful extractor | |
| except Exception: | |
| textract = None | |
| import hashlib | |
| import json | |
| import re | |
| logger = logging.getLogger("ingestion_worker") | |
| logger.setLevel(os.getenv("LOG_LEVEL", "INFO")) | |
| # ----------------------- | |
| # Utilities | |
| # ----------------------- | |
| def extract_text_from_path(path: str) -> str: | |
| """ | |
| Extract text from a given file path. | |
| Supports: .txt, .md, .pdf (via textract if available), .docx (basic fallback) | |
| """ | |
| p = Path(path) | |
| if not p.exists(): | |
| raise FileNotFoundError(f"File not found: {path}") | |
| suffix = p.suffix.lower() | |
| if suffix in [".txt", ".md"]: | |
| return p.read_text(encoding="utf-8", errors="ignore") | |
| if textract is not None: | |
| try: | |
| raw = textract.process(str(p)) | |
| return raw.decode("utf-8", errors="ignore") | |
| except Exception as e: | |
| logger.warning("textract failed, falling back to basic read (%s)", e) | |
| # basic docx fallback | |
| if suffix == ".docx": | |
| try: | |
| from docx import Document as DocxDocument | |
| doc = DocxDocument(str(p)) | |
| return "\n".join(par.text for par in doc.paragraphs) | |
| except Exception: | |
| logger.exception("docx extraction failed; returning empty text") | |
| return "" | |
| # last fallback: binary read as text | |
| try: | |
| return p.read_text(encoding="utf-8", errors="ignore") | |
| except Exception: | |
| logger.exception("unknown file type and text read failed") | |
| return "" | |
| def simple_chunk_text(text: str, chunk_size: int = 800, chunk_overlap: int = 100) -> List[str]: | |
| """ | |
| Very simple chunker that splits on sentences up to roughly chunk_size tokens/characters. | |
| chunk_size is approximate (characters). Overlap is characters overlapped between chunks. | |
| """ | |
| clean = re.sub(r"\s+", " ", text).strip() | |
| if not clean: | |
| return [] | |
| chunks = [] | |
| start = 0 | |
| n = len(clean) | |
| while start < n: | |
| end = min(n, start + chunk_size) | |
| # try to expand to sentence boundary | |
| if end < n: | |
| m = clean.rfind(".", start, end) | |
| if m != -1 and m - start > chunk_size // 3: | |
| end = m + 1 | |
| chunk = clean[start:end].strip() | |
| if chunk: | |
| chunks.append(chunk) | |
| start = max(end - chunk_overlap, end) | |
| return chunks | |
| def embed_texts(texts: List[str]) -> List[List[float]]: | |
| """ | |
| Use SentenceTransformer model if available; otherwise use a deterministic hash-based fallback vector. | |
| The fallback returns a small vector (e.g. 64-d) computed from sha256 chunks — only for local dev/testing. | |
| """ | |
| if EMBED_MODEL is not None: | |
| vectors = EMBED_MODEL.encode(texts, show_progress_bar=False).tolist() | |
| return vectors | |
| # fallback | |
| vecs = [] | |
| for t in texts: | |
| h = hashlib.sha256(t.encode("utf-8")).digest() | |
| # convert to floats in range [-1,1] | |
| vals = [] | |
| for i in range(0, min(len(h), 64)): | |
| vals.append(((h[i] / 255.0) * 2.0) - 1.0) | |
| # pad to 64 | |
| while len(vals) < 64: | |
| vals.append(0.0) | |
| vecs.append(vals) | |
| return vecs | |
| def upsert_embeddings_supabase(tenant_id: str, doc_id: str, chunks: List[Dict]): | |
| """ | |
| Upsert chunk records into Supabase table `embeddings` (or 'rag_embeddings'). | |
| Expected schema (example): | |
| - id (uuid) | |
| - tenant_id (text) | |
| - doc_id (text) | |
| - chunk_index (int) | |
| - chunk_text (text) | |
| - metadata (jsonb) | |
| - embedding (vector) -> in pgvector column | |
| - created_at (timestamp) | |
| This function attempts to use supabase-py. If not available, logs the JSON for manual insertion. | |
| """ | |
| if SUPABASE is None: | |
| logger.warning("Supabase client not configured. Logging chunks for manual insertion.") | |
| logger.debug("Chunks sample: %s", json.dumps(chunks[:3], indent=2)) | |
| return {"status": "logged", "count": len(chunks)} | |
| table_name = os.getenv("SUPABASE_EMBED_TABLE", "rag_embeddings") | |
| # Build rows | |
| rows = [] | |
| ts = int(time.time()) | |
| for c in chunks: | |
| rows.append({ | |
| "tenant_id": tenant_id, | |
| "doc_id": doc_id, | |
| "chunk_index": c.get("index"), | |
| "chunk_text": c.get("text"), | |
| "metadata": c.get("metadata", {}), | |
| "embedding": c.get("embedding"), | |
| "created_at": time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(ts)), | |
| }) | |
| # Use upsert | |
| try: | |
| res = SUPABASE.table(table_name).upsert(rows).execute() | |
| logger.info("Upserted %d embeddings to Supabase table %s", len(rows), table_name) | |
| return {"status": "ok", "count": len(rows), "result": res} | |
| except Exception as e: | |
| logger.exception("Failed to upsert embeddings to Supabase: %s", e) | |
| return {"status": "error", "error": str(e)} | |
| # ----------------------- | |
| # Celery task(s) | |
| # ----------------------- | |
| # We import app lazily to avoid circular imports — celery app is in celeryconfig.py | |
| try: | |
| from backend.workers.celeryconfig import celery_app | |
| except Exception: | |
| # If import fails, create a simple local Celery-like decorator using dummy shared_task | |
| celery_app = None | |
| def task_decorator(func): | |
| if celery_app is not None: | |
| return celery_app.task(func) | |
| else: | |
| # no-op: run synchronously for dev/testing | |
| def wrapper(*args, **kwargs): | |
| return func(*args, **kwargs) | |
| return wrapper | |
| def ingest_document(tenant_id: str, doc_id: str, | |
| file_path: Optional[str] = None, | |
| raw_text: Optional[str] = None, | |
| source_url: Optional[str] = None, | |
| chunk_size: int = 800, | |
| chunk_overlap: int = 100): | |
| """ | |
| End-to-end ingestion task. | |
| - if raw_text provided, use it | |
| - otherwise extract from file_path | |
| - chunk, embed, and upsert into Supabase | |
| Returns a dict with status and counts. | |
| """ | |
| start_ts = time.time() | |
| logger.info("ingest_document started: tenant=%s doc_id=%s", tenant_id, doc_id) | |
| try: | |
| if raw_text: | |
| text = raw_text | |
| elif file_path: | |
| text = extract_text_from_path(file_path) | |
| elif source_url: | |
| # basic fetch for simple pages | |
| import requests | |
| r = requests.get(source_url, timeout=10) | |
| r.raise_for_status() | |
| text = re.sub(r"\s+", " ", r.text) | |
| else: | |
| raise ValueError("Either raw_text, file_path, or source_url must be provided.") | |
| if not text or text.strip() == "": | |
| logger.warning("No text extracted for doc_id=%s", doc_id) | |
| return {"status": "empty", "chunks": 0} | |
| chunks_text = simple_chunk_text(text, chunk_size=chunk_size, chunk_overlap=chunk_overlap) | |
| logger.info("Extracted %d chunks for doc=%s", len(chunks_text), doc_id) | |
| # Prepare chunk metadata | |
| chunks = [] | |
| for i, t in enumerate(chunks_text): | |
| chunks.append({"index": i, "text": t, "metadata": {"source_url": source_url}}) | |
| # embed in batches | |
| batch_size = 32 | |
| embeddings = [] | |
| for i in range(0, len(chunks), batch_size): | |
| batch_texts = [c["text"] for c in chunks[i:i+batch_size]] | |
| batch_emb = embed_texts(batch_texts) | |
| embeddings.extend(batch_emb) | |
| # attach embeddings | |
| for i, c in enumerate(chunks): | |
| c["embedding"] = embeddings[i] | |
| # upsert into supabase | |
| res = upsert_embeddings_supabase(tenant_id, doc_id, chunks) | |
| elapsed = time.time() - start_ts | |
| logger.info("ingest_document finished: tenant=%s doc=%s chunks=%d elapsed=%.2fs", | |
| tenant_id, doc_id, len(chunks), elapsed) | |
| return {"status": "ok", "chunks": len(chunks), "supabase": res} | |
| except Exception as exc: | |
| logger.exception("ingest_document failed: %s", exc) | |
| return {"status": "error", "error": str(exc)} | |