#!/usr/bin/env python3 from __future__ import annotations import os import time import uuid import pandas as pd from huggingface_hub import CommitOperationAdd, CommitOperationDelete, HfApi, hf_hub_download VOTES_REPO_ID = "taigasan/e6-visual-ratings" VOTES_REPO_TYPE = "dataset" VOTES_LOG_SUBDIR = "ratings_log" RATINGS_APP_TOKEN_ENV = "RATINGS_APP_TOKEN" def _list_vote_shards(api: HfApi) -> list[str]: files = api.list_repo_files(repo_id=VOTES_REPO_ID, repo_type=VOTES_REPO_TYPE) shard_prefix = f"{VOTES_LOG_SUBDIR}/votes_" return sorted( f for f in files if f.startswith(shard_prefix) and f.endswith(".parquet") ) def _new_compacted_shard_path() -> str: ts = int(time.time()) return f"{VOTES_LOG_SUBDIR}/votes_{ts}_{uuid.uuid4().hex}.parquet" def compact_votes() -> tuple[int, int, str] | None: token = os.getenv(RATINGS_APP_TOKEN_ENV) api = HfApi(token=token) shards = _list_vote_shards(api) if len(shards) < 2: return None frames: list[pd.DataFrame] = [] for shard in shards: shard_local = hf_hub_download( repo_id=VOTES_REPO_ID, filename=shard, repo_type=VOTES_REPO_TYPE, token=token, ) frames.append(pd.read_parquet(shard_local)) combined = pd.concat(frames, ignore_index=True, sort=False) output_row_count = len(combined) assert len(combined) == sum(len(frame) for frame in frames) compacted_data = combined.to_parquet(index=False) compacted_path = _new_compacted_shard_path() api.create_commit( repo_id=VOTES_REPO_ID, repo_type=VOTES_REPO_TYPE, commit_message=f"compact {len(shards)} vote shards", operations=[ CommitOperationAdd(path_or_fileobj=compacted_data, path_in_repo=compacted_path), *(CommitOperationDelete(path_in_repo=shard) for shard in shards), ], ) return len(shards), len(combined), compacted_path def _main() -> None: result = compact_votes() if result is None: print(f"Nothing to compact.") return shard_count, row_count, compacted_path = result print( f"Compacted {shard_count} shards into " f"{VOTES_REPO_ID}/{compacted_path} " f"with {row_count} rows." ) if __name__ == "__main__": _main()