| | |
| | """*A cross-lingual semantic similarity preserving hash for plain-text content (soft hash).* |
| | |
| | The ISCC Text-Code Semantic is a content-based compact binary code generated from multilingual text. |
| | |
| | !!! Warning |
| | |
| | This is a non-standard Proof of Concept implementation. |
| | Plain-text extraction from documents in various formats (especially PDF) may |
| | yield different results depending on the extraction tools being used. |
| | The [iscc-sdk](https://github.com/iscc/iscc-sdk) uses [Apache Tika](https://tika.apache.org) |
| | to extract text from documents for Text-Code generation. |
| | |
| | **Algorithm overview** |
| | |
| | - Split text into semantically coherent overlapping chunks. |
| | - Create vector embeddings of the chunks. |
| | - Average and binarize the chunk embeddings. |
| | - Encode as ISCC-UNIT of MainType SEMANTIC and SubType TEXT |
| | """ |
| |
|
| | from loguru import logger as log |
| | from onnxruntime.capi.onnxruntime_pybind11_state import NoSuchFile |
| | from semantic_text_splitter import TextSplitter |
| | from tokenizers import Tokenizer |
| | from pathlib import Path |
| | from typing import Any |
| | import numpy as np |
| | import onnxruntime as rt |
| | from numpy.typing import NDArray |
| | from functools import cache |
| | import iscc_sct as sct |
| |
|
| |
|
| | HERE = Path(__file__).parent.absolute() |
| |
|
| |
|
| | __all__ = [ |
| | "code_text_semantic", |
| | "gen_text_code_semantic", |
| | "soft_hash_text_semantic", |
| | "embed_chunks", |
| | ] |
| |
|
| | BIT_LEN_MAP = { |
| | 32: "0000", |
| | 64: "0001", |
| | 96: "0010", |
| | 128: "0011", |
| | 160: "0100", |
| | 192: "0101", |
| | 224: "0110", |
| | 256: "0111", |
| | } |
| |
|
| |
|
| | TOKENIZER_PATH = HERE / "tokenizer.json" |
| | MAINTYPE = "0001" |
| | SUBTYPE = "0000" |
| | SCT_VERSION = "0000" |
| |
|
| |
|
| | def code_text_semantic(fp, **options): |
| | |
| | """ |
| | Generate ISCC Semantic-Code Text from a text file. |
| | |
| | NOTE: |
| | If you enable generating granular features with `features=True` those features will have |
| | the same bit-length as the generated ISCC-UNIT. |
| | |
| | :param fp: File path of plaintext file to process |
| | :param options: Custom processing options for overriding global options |
| | :key bits (int): Length of generated Semantic Text-Code in bits (default 64) |
| | :key characters (bool): Return document character count (default True). |
| | :key embedding (bool): Return global document embedding (default False). |
| | :key precision (int): Max fractional digits for embeddings (default 8). |
| | :key simprints (bool): Return granular document features (default False). |
| | :key offsets (bool): Return character offsets for granular features (default False). |
| | :key sizes (bool): Include sizes of granular features (number of chars, default False). |
| | :key contents (bool): Return text chunks (default False). |
| | :key max_tokens (int): Max tokens per chunk (default 127). |
| | :key overlap (int): Max tokens allowed to overlap between chunks (default 48). |
| | :key trim (int): Trim whitespace from chunks (default False). |
| | :return: Dict with ISCC processing results |
| | """ |
| | fp = Path(fp) |
| | return gen_text_code_semantic(fp.read_text(encoding="utf-8"), **options) |
| |
|
| |
|
| | def gen_text_code_semantic(text, **options): |
| | |
| | """ |
| | Create an ISCC Semantic-Code Text from plaintext. |
| | |
| | :param str text: Plaint text for ISCC processing |
| | :param options: Custom processing options for overriding global options |
| | :key bits (int): Length of generated Semantic Text-Code in bits (default 64) |
| | :key characters (bool): Return document character count (default True). |
| | :key embedding (bool): Return global document embedding (default False). |
| | :key precision (int): Max fractional digits for embeddings (default 8). |
| | :key simprints (bool): Return granular document features (default False). |
| | :key offsets (bool): Return character offsets for granular features (default False). |
| | :key sizes (bool): Include sizes of granular features (number of chars, default False). |
| | :key contents (bool): Return text chunks (default False). |
| | :key max_tokens (int): Max tokens per chunk (default 127). |
| | :key overlap (int): Max tokens allowed to overlap between chunks (default 48). |
| | :key trim (int): Trim whitespace from chunks (default False). |
| | :return: Dict with ISCC processing results (using Index-Format for granular features) |
| | """ |
| |
|
| | if not text: |
| | raise ValueError("Input text cannot be empty.") |
| |
|
| | opts = sct.sct_opts.override(options) |
| |
|
| | result = {"iscc": None} |
| |
|
| | if opts.characters: |
| | result["characters"] = len(text) |
| |
|
| | |
| | splits = split_text(text, **opts.model_dump()) |
| | offsets, chunks = [list(item) for item in zip(*splits)] |
| |
|
| | |
| | with sct.timer("EMBEDDING time"): |
| | embeddings = embed_chunks(chunks) |
| |
|
| | |
| | embedding = mean_pooling(embeddings) |
| |
|
| | if any([opts.simprints, opts.offsets, opts.sizes, opts.contents, opts.embedding]): |
| | feature_set = { |
| | "maintype": "semantic", |
| | "subtype": "text", |
| | "version": 0, |
| | } |
| | if opts.embedding: |
| | feature_set["embedding"] = compress(embedding, opts.precision) |
| | if opts.simprints: |
| | feature_digests = [binarize(vec)[: opts.bits_granular // 8] for vec in embeddings] |
| | feature_set["simprints"] = [sct.encode_base64(digest) for digest in feature_digests] |
| | if opts.offsets: |
| | feature_set["offsets"] = offsets |
| | if opts.sizes: |
| | feature_set["sizes"] = [len(chunk) for chunk in chunks] |
| | if opts.contents: |
| | feature_set["contents"] = chunks |
| | result["features"] = [feature_set] |
| |
|
| | |
| | length = BIT_LEN_MAP[opts.bits] |
| | header = int(MAINTYPE + SUBTYPE + SCT_VERSION + length, 2).to_bytes(2, byteorder="big") |
| | digest = binarize(embedding)[: opts.bits // 8] |
| | code = sct.encode_base32(header + digest) |
| | result["iscc"] = "ISCC:" + code |
| | return result |
| |
|
| |
|
| | def soft_hash_text_semantic(text): |
| | |
| | """Creates a 256-bit semantic similarity preserving hash for text input.""" |
| | chunks = [item[1] for item in split_text(text)] |
| | embeddings = embed_chunks(chunks) |
| | embedding = mean_pooling(embeddings) |
| | digest = binarize(embedding) |
| | return digest |
| |
|
| |
|
| | def split_text(text, **options): |
| | |
| | """ |
| | Split text into semantically coherent chunks for embedding. |
| | |
| | :param text: Text to split. |
| | :param options: Custom processing options for overriding global options |
| | :key max_tokens (int): Max tokens per chunk (default 127). |
| | :key overlap (int): Max tokens allowed to overlap between chunks (default 48). |
| | :key trim (int): Trim whitespace from chunks (default False). |
| | :return: A list of offset, chunk tuples [(offset,chunk), ...] |
| | """ |
| | opts = sct.sct_opts.override(options) |
| | return splitter(**opts.model_dump()).chunk_indices(text) |
| |
|
| |
|
| | @cache |
| | def tokenizer(): |
| | |
| | """ |
| | Load and cache the tokenizer model based on the predefined model name. |
| | |
| | :return: An instance of the Tokenizer. |
| | """ |
| | with sct.timer("TOKENIZER load time"): |
| | return Tokenizer.from_file(TOKENIZER_PATH.as_posix()) |
| |
|
| |
|
| | @cache |
| | def splitter(**options): |
| | |
| | """ |
| | Load and cache the text splitter, initialized with tokenizer. |
| | |
| | :param options: Custom processing options for overriding global options |
| | :key max_tokens (int): Max tokens per chunk (default 127). |
| | :key overlap (int): Max tokens allowed to overlap between chunks (default 48). |
| | :key trim (int): Trim whitespace from chunks (default False). |
| | :return: An instance of TextSplitter. |
| | """ |
| | opts = sct.sct_opts.override(options) |
| | with sct.timer("TEXTSPLITTER load time"): |
| | return TextSplitter.from_huggingface_tokenizer( |
| | tokenizer(), capacity=opts.max_tokens, overlap=opts.overlap, trim=opts.trim |
| | ) |
| |
|
| |
|
| | @cache |
| | def model(): |
| | |
| | """ |
| | Load and cache the ONNX inference model from a specified path. |
| | |
| | :return: An ONNX inference session. |
| | """ |
| | available_onnx_providers = rt.get_available_providers() |
| | log.debug(f"Available ONNX providers {', '.join(available_onnx_providers)}") |
| | selected_onnx_providers = ["CPUExecutionProvider"] |
| | if "CUDAExecutionProvider" in available_onnx_providers: |
| | selected_onnx_providers.insert(0, "CUDAExecutionProvider") |
| | log.debug(f"Using ONNX providers {', '.join(selected_onnx_providers)}") |
| | so = rt.SessionOptions() |
| | so.graph_optimization_level = rt.GraphOptimizationLevel.ORT_ENABLE_ALL |
| | try: |
| | with sct.timer("ONNXMODEL load time"): |
| | return rt.InferenceSession( |
| | sct.MODEL_PATH, sess_options=so, providers=selected_onnx_providers |
| | ) |
| | except NoSuchFile: |
| | with sct.timer("ONNXMODEL aquisition/load time"): |
| | model_path = sct.get_model() |
| | return rt.InferenceSession( |
| | model_path, sess_options=so, providers=selected_onnx_providers |
| | ) |
| |
|
| |
|
| | def tokenize_chunks(chunks): |
| | |
| | """ |
| | Tokenize text chunks into model-compatible formats. |
| | |
| | :param chunks: Text chunks to tokenize. |
| | :return: Dictionary of tokenized data including input IDs, attention masks, and type IDs. |
| | """ |
| | encodings = tokenizer().encode_batch(chunks) |
| | input_ids = np.array([encoding.ids for encoding in encodings], dtype=np.int64) |
| | attention_mask = np.array([encoding.attention_mask for encoding in encodings], dtype=np.int64) |
| | type_ids = np.array([encoding.type_ids for encoding in encodings], dtype=np.int64) |
| | return {"input_ids": input_ids, "attention_mask": attention_mask, "token_type_ids": type_ids} |
| |
|
| |
|
| | def embed_chunks(chunks, batch_size=100): |
| | """ |
| | Embed text chunks and return vector embeddings. |
| | |
| | :param chunks: Text chunks to embed. |
| | :param batch_size: Number of chunks to process in each batch. |
| | :return: An array of embeddings for each chunk. |
| | """ |
| | embeddings = [] |
| | for start_idx in range(0, len(chunks), batch_size): |
| | batch_chunks = chunks[start_idx : start_idx + batch_size] |
| | tokens = tokenize_chunks(batch_chunks) |
| | token_embeddings = embed_tokens(tokens) |
| | batch_embeddings = attention_pooling(token_embeddings, tokens["attention_mask"]) |
| | embeddings.append(batch_embeddings) |
| | return np.vstack(embeddings) |
| |
|
| |
|
| | def embed_tokens(tokens): |
| | |
| | """ |
| | Create embeddings from tokenized text chunks using the model. |
| | |
| | :param tokens: Tokenized text data. |
| | :return: An array of embeddings. |
| | """ |
| | result = model().run(None, tokens) |
| | return np.array(result[0]) |
| |
|
| |
|
| | def attention_pooling(token_embeddings, attention_mask): |
| | |
| | """ |
| | Apply attention mask based mean pooling to the token embeddings. |
| | |
| | :param token_embeddings: Raw token embeddings from the model. |
| | :param attention_mask: Attention masks for the embeddings. |
| | :return: An array of pooled and normalized embeddings. |
| | """ |
| | input_mask_expanded = attention_mask[:, :, None].astype(np.float32) |
| | sum_embeddings = np.sum(token_embeddings * input_mask_expanded, axis=1) |
| | sum_mask = np.clip(np.sum(input_mask_expanded, axis=1), a_min=1e-9, a_max=None) |
| | mean_pooled = sum_embeddings / sum_mask |
| | norm = np.linalg.norm(mean_pooled, ord=2, axis=1, keepdims=True) |
| | result = mean_pooled / np.clip(norm, a_min=1e-9, a_max=None) |
| | return result.astype(np.float32) |
| |
|
| |
|
| | def mean_pooling(embeddings): |
| | |
| | """ |
| | Calculate the document vector from chunk embeddings using mean pooling. |
| | |
| | :param embeddings: Chunk embeddings. |
| | :return: A normalized document vector. |
| | """ |
| | document_vector = embeddings.mean(axis=0) |
| | return document_vector / np.linalg.norm(document_vector) |
| |
|
| |
|
| | def binarize(vec): |
| | |
| | """ |
| | Binarize an embedding vector into a hash digest. |
| | |
| | :param vec: Vector to be binarized. |
| | :return: A bytes object representing the binary hash. |
| | """ |
| | return bytes((np.packbits(np.array(vec) >= 0))) |
| |
|
| |
|
| | def compress(vec, precision): |
| | |
| | """ |
| | Round down vector values to specified precision to reduce storage requirements. |
| | |
| | :param vec: Embedding vector. |
| | :param precision: Max number of fractional decimal places. |
| | :return: Vector as native python list of rounded floats. |
| | """ |
| | rounded_array = np.around(vec, decimals=precision) |
| | compress_list = [round(x, precision) for x in rounded_array.tolist()] |
| | return compress_list |
| |
|