Spaces:
Sleeping
Sleeping
| import hashlib | |
| import logging | |
| import mimetypes | |
| from io import BytesIO | |
| import logfire | |
| from surrealdb import RecordID | |
| from kaig.db import DB | |
| from kaig.definitions import OriginalDocument | |
| from ..conversion import ConvertersFactory | |
| from ..conversion.definitions import ( | |
| ChunkWithMetadata, | |
| DocumentStreamGeneric, | |
| ) | |
| from ..definitions import Chunk, Tables | |
| from ..utils import is_chunk_empty | |
| logger = logging.getLogger(__name__) | |
| def chunking_handler(db: DB, document: OriginalDocument) -> None: | |
| with logfire.span("Chunking {doc=}", doc=document.id): | |
| doc_stream = DocumentStreamGeneric( | |
| name=document.filename, stream=BytesIO(document.file) | |
| ) | |
| embedding_model = ( | |
| db.embedder.model_name if db.embedder else "text-embedding-3-small" | |
| ) | |
| content_type = document.content_type | |
| if not content_type or content_type == "application/octet-stream": | |
| guessed, _ = mimetypes.guess_type(document.filename) | |
| if guessed: | |
| content_type = guessed | |
| converters = ConvertersFactory.get_converters(content_type, embedding_model) | |
| result = None | |
| last_error: BaseException | None = None | |
| for converter in converters: | |
| try: | |
| result = converter.convert_and_chunk(doc_stream) | |
| logger.info("Using %s", converter.__class__.__name__) | |
| break | |
| except BaseException as e: | |
| if isinstance(e, (KeyboardInterrupt, SystemExit)): | |
| raise | |
| last_error = e | |
| logger.warning( | |
| "Converter %s failed for document %s: %s", | |
| converter.__class__.__name__, | |
| document.id, | |
| e, | |
| ) | |
| if result is None: | |
| logger.error("Error chunking document %s: %s", document.id, last_error) | |
| if last_error: | |
| # Normalize foreign BaseException types (e.g. pyo3 panics). | |
| raise RuntimeError(str(last_error)) | |
| raise RuntimeError("No converters available") | |
| for i, chunk in enumerate(result.chunks): | |
| chunk_text = ( | |
| chunk.content if isinstance(chunk, ChunkWithMetadata) else chunk | |
| ) | |
| logger.info(f"Processing chunk: {chunk_text[:60]}") | |
| if is_chunk_empty(chunk_text): | |
| continue | |
| hash = hashlib.md5(chunk_text.encode("utf-8")).hexdigest() | |
| chunk_id = RecordID(Tables.chunk.value, hash) | |
| # skip if it already exists | |
| if db.exists(chunk_id): | |
| continue | |
| # ------------------------------------------------------------------ | |
| # -- Embed chunks and insert | |
| doc = Chunk( | |
| content=chunk_text, | |
| id=chunk_id, | |
| doc=document.id, | |
| index=i, | |
| metadata=chunk.metadata | |
| if isinstance(chunk, ChunkWithMetadata) | |
| else None, | |
| ) | |
| try: | |
| _ = db.embed_and_insert(doc, table=Tables.chunk.value, id=hash) | |
| except Exception as e: | |
| logger.error( | |
| f"Error embedding chunk {chunk_id} with len={len(doc.content)}: {type(e)} {e}" | |
| ) | |
| raise e | |
| logger.info("Finished chunking!") | |