Spaces:
Running
Running
| """Generate sparse text embeddings for each panel using Pinecone inference. | |
| Processes panels in chunks of CHUNK_SIZE, writing one parquet file per chunk. | |
| Resumable: skips chunks whose output file already exists. | |
| """ | |
| from pathlib import Path | |
| import os | |
| import time | |
| import pandas as pd | |
| from tqdm import tqdm | |
| from pinecone import Pinecone | |
| MANIFEST_PATH = Path("data/comics/processed/panels_manifest.parquet") | |
| OUTPUT_PATH = Path("data/comics/processed/text_sparse_vectors.parquet") | |
| CHUNKS_DIR = Path("data/comics/processed/sparse_chunks") | |
| CHUNK_SIZE = 10_000 | |
| BATCH_SIZE = 96 | |
| SPARSE_MODEL = os.environ.get("SPARSE_EMBED_MODEL", "pinecone-sparse-english-v0") | |
| _pc = Pinecone(api_key=os.environ["PINECONE_API_KEY"]) | |
| def embed_sparse_batch(texts: list[str], input_type: str = "passage", retries: int = 5) -> list[dict | None]: | |
| non_empty = [(i, t) for i, t in enumerate(texts) if t.strip()] | |
| results: list[dict | None] = [None] * len(texts) | |
| if not non_empty: | |
| return results | |
| indices, batch_texts = zip(*non_empty) | |
| for attempt in range(retries): | |
| try: | |
| response = _pc.inference.embed( | |
| model=SPARSE_MODEL, | |
| inputs=list(batch_texts), | |
| parameters={"input_type": input_type}, | |
| ) | |
| for idx, embedding in zip(indices, response): | |
| if embedding.sparse_indices and embedding.sparse_values: | |
| results[idx] = { | |
| "indices": embedding.sparse_indices, | |
| "values": embedding.sparse_values, | |
| } | |
| return results | |
| except Exception as exc: | |
| if attempt == retries - 1: | |
| raise | |
| wait = 2 ** attempt | |
| print(f" Retry {attempt + 1}: {exc}; waiting {wait}s") | |
| time.sleep(wait) | |
| return results | |
| def embed_sparse_query(text: str) -> dict | None: | |
| if not text.strip(): | |
| return None | |
| return embed_sparse_batch([text], input_type="query")[0] | |
| def merge_chunks() -> None: | |
| chunk_files = sorted(CHUNKS_DIR.glob("chunk_*.parquet")) | |
| if not chunk_files: | |
| print("No chunk files to merge") | |
| return | |
| print(f"Merging {len(chunk_files)} chunk files...") | |
| df = pd.concat([pd.read_parquet(f) for f in chunk_files], ignore_index=True) | |
| df.to_parquet(OUTPUT_PATH, index=False) | |
| print(f"Merged {len(df):,} rows → {OUTPUT_PATH}") | |
| def done_chunks() -> set[int]: | |
| if not CHUNKS_DIR.exists(): | |
| return set() | |
| return {int(f.stem.split("_")[1]) for f in CHUNKS_DIR.glob("chunk_*.parquet")} | |
| def main(): | |
| import argparse | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--merge", action="store_true") | |
| args = parser.parse_args() | |
| if args.merge: | |
| merge_chunks() | |
| return | |
| CHUNKS_DIR.mkdir(parents=True, exist_ok=True) | |
| manifest = pd.read_parquet(MANIFEST_PATH) | |
| already_done = done_chunks() | |
| total_chunks = (len(manifest) + CHUNK_SIZE - 1) // CHUNK_SIZE | |
| remaining = [i for i in range(total_chunks) if i not in already_done] | |
| print(f"Total panels: {len(manifest):,} | chunks done: {len(already_done)}/{total_chunks} | remaining: {len(remaining)}") | |
| print(f"Model: {SPARSE_MODEL}") | |
| for chunk_idx in remaining: | |
| start = chunk_idx * CHUNK_SIZE | |
| end = min(start + CHUNK_SIZE, len(manifest)) | |
| chunk_df = manifest.iloc[start:end] | |
| texts = chunk_df["search_text"].fillna("").tolist() | |
| panel_ids = chunk_df["panel_id"].tolist() | |
| rows = [] | |
| pbar = tqdm(range(0, len(texts), BATCH_SIZE), desc=f"chunk {chunk_idx+1}/{total_chunks}", leave=False) | |
| for start_b in pbar: | |
| batch = texts[start_b: start_b + BATCH_SIZE] | |
| batch_ids = panel_ids[start_b: start_b + BATCH_SIZE] | |
| sparse_results = embed_sparse_batch(batch) | |
| for pid, sparse in zip(batch_ids, sparse_results): | |
| rows.append({"panel_id": pid, "text_sparse": sparse}) | |
| chunk_path = CHUNKS_DIR / f"chunk_{chunk_idx:06d}.parquet" | |
| pd.DataFrame(rows).to_parquet(chunk_path, index=False) | |
| done_count = len(already_done) + (remaining.index(chunk_idx) + 1) | |
| filled = sum(1 for r in rows if r["text_sparse"] is not None) | |
| print(f" chunk {chunk_idx:06d}: {len(rows)} rows ({filled} with vectors) [{done_count}/{total_chunks} done]") | |
| print("All chunks complete. Merging...") | |
| merge_chunks() | |
| if __name__ == "__main__": | |
| main() | |