Santiago Casas
fixes for better inference, model selection and ingestion
e456740
import hashlib
import logging
import mimetypes
from io import BytesIO
import logfire
from surrealdb import RecordID
from kaig.db import DB
from kaig.definitions import OriginalDocument
from ..conversion import ConvertersFactory
from ..conversion.definitions import (
ChunkWithMetadata,
DocumentStreamGeneric,
)
from ..definitions import Chunk, Tables
from ..utils import is_chunk_empty
logger = logging.getLogger(__name__)
def chunking_handler(db: DB, document: OriginalDocument) -> None:
with logfire.span("Chunking {doc=}", doc=document.id):
doc_stream = DocumentStreamGeneric(
name=document.filename, stream=BytesIO(document.file)
)
embedding_model = (
db.embedder.model_name if db.embedder else "text-embedding-3-small"
)
content_type = document.content_type
if not content_type or content_type == "application/octet-stream":
guessed, _ = mimetypes.guess_type(document.filename)
if guessed:
content_type = guessed
converters = ConvertersFactory.get_converters(content_type, embedding_model)
result = None
last_error: BaseException | None = None
for converter in converters:
try:
result = converter.convert_and_chunk(doc_stream)
logger.info("Using %s", converter.__class__.__name__)
break
except BaseException as e:
if isinstance(e, (KeyboardInterrupt, SystemExit)):
raise
last_error = e
logger.warning(
"Converter %s failed for document %s: %s",
converter.__class__.__name__,
document.id,
e,
)
if result is None:
logger.error("Error chunking document %s: %s", document.id, last_error)
if last_error:
# Normalize foreign BaseException types (e.g. pyo3 panics).
raise RuntimeError(str(last_error))
raise RuntimeError("No converters available")
for i, chunk in enumerate(result.chunks):
chunk_text = (
chunk.content if isinstance(chunk, ChunkWithMetadata) else chunk
)
logger.info(f"Processing chunk: {chunk_text[:60]}")
if is_chunk_empty(chunk_text):
continue
hash = hashlib.md5(chunk_text.encode("utf-8")).hexdigest()
chunk_id = RecordID(Tables.chunk.value, hash)
# skip if it already exists
if db.exists(chunk_id):
continue
# ------------------------------------------------------------------
# -- Embed chunks and insert
doc = Chunk(
content=chunk_text,
id=chunk_id,
doc=document.id,
index=i,
metadata=chunk.metadata
if isinstance(chunk, ChunkWithMetadata)
else None,
)
try:
_ = db.embed_and_insert(doc, table=Tables.chunk.value, id=hash)
except Exception as e:
logger.error(
f"Error embedding chunk {chunk_id} with len={len(doc.content)}: {type(e)} {e}"
)
raise e
logger.info("Finished chunking!")