#!/usr/bin/env python3 from __future__ import annotations import os import time import uuid from pathlib import Path from tempfile import NamedTemporaryFile import pandas as pd from huggingface_hub import CommitOperationAdd, CommitOperationDelete, HfApi, hf_hub_download VOTES_REPO_ID = "taigasan/e6-visual-ratings" VOTES_REPO_TYPE = "dataset" VOTES_LOG_SUBDIR = "ratings_log" RATINGS_APP_TOKEN_ENV = "RATINGS_APP_TOKEN" def _list_vote_shards(api: HfApi) -> list[str]: files = api.list_repo_files(repo_id=VOTES_REPO_ID, repo_type=VOTES_REPO_TYPE) shard_prefix = f"{VOTES_LOG_SUBDIR}/votes_" return sorted( f for f in files if f.startswith(shard_prefix) and f.endswith(".parquet") ) def _new_compacted_shard_path() -> str: ts = int(time.time()) return f"{VOTES_LOG_SUBDIR}/votes_{ts}_{uuid.uuid4().hex}.parquet" def compact_votes() -> tuple[int, int, str]: token = os.getenv(RATINGS_APP_TOKEN_ENV) api = HfApi(token=token) shards = _list_vote_shards(api) if not shards: raise FileNotFoundError(f"No vote shards found in {VOTES_REPO_ID}/{VOTES_LOG_SUBDIR}") frames = [] for shard in shards: shard_local = hf_hub_download( repo_id=VOTES_REPO_ID, filename=shard, repo_type=VOTES_REPO_TYPE, token=token, ) frame = pd.read_parquet(shard_local) frames.append(frame) input_row_count = sum(len(frame) for frame in frames) combined = pd.concat(frames, ignore_index=True, sort=False) output_row_count = int(len(combined)) if output_row_count != input_row_count: raise RuntimeError( f"Refusing to commit: row mismatch during compaction " f"({input_row_count} -> {output_row_count})." ) with NamedTemporaryFile(suffix=".parquet", delete=False) as tmp: tmp_path = Path(tmp.name) try: combined.to_parquet(tmp_path, index=False) compacted_path = _new_compacted_shard_path() operations = [ CommitOperationAdd(path_or_fileobj=str(tmp_path), path_in_repo=compacted_path), *[CommitOperationDelete(path_in_repo=shard) for shard in shards], ] api.create_commit( repo_id=VOTES_REPO_ID, repo_type=VOTES_REPO_TYPE, commit_message=f"compact {len(shards)} vote shard(s)", operations=operations, ) finally: tmp_path.unlink(missing_ok=True) return len(shards), output_row_count, compacted_path def main() -> None: shard_count, row_count, compacted_path = compact_votes() print( f"Compacted {shard_count} shard(s) into " f"{VOTES_REPO_ID}/{compacted_path} " f"with {row_count} rows." ) if __name__ == "__main__": main()