Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| from __future__ import annotations | |
| import os | |
| import time | |
| import uuid | |
| import pandas as pd | |
| from huggingface_hub import CommitOperationAdd, CommitOperationDelete, HfApi, hf_hub_download | |
| VOTES_REPO_ID = "taigasan/e6-visual-ratings" | |
| VOTES_REPO_TYPE = "dataset" | |
| VOTES_LOG_SUBDIR = "ratings_log" | |
| RATINGS_APP_TOKEN_ENV = "RATINGS_APP_TOKEN" | |
| def _list_vote_shards(api: HfApi) -> list[str]: | |
| files = api.list_repo_files(repo_id=VOTES_REPO_ID, repo_type=VOTES_REPO_TYPE) | |
| shard_prefix = f"{VOTES_LOG_SUBDIR}/votes_" | |
| return sorted( | |
| f | |
| for f in files | |
| if f.startswith(shard_prefix) | |
| and f.endswith(".parquet") | |
| ) | |
| def _new_compacted_shard_path() -> str: | |
| ts = int(time.time()) | |
| return f"{VOTES_LOG_SUBDIR}/votes_{ts}_{uuid.uuid4().hex}.parquet" | |
| def compact_votes() -> tuple[int, int, str] | None: | |
| token = os.getenv(RATINGS_APP_TOKEN_ENV) | |
| api = HfApi(token=token) | |
| shards = _list_vote_shards(api) | |
| if len(shards) < 2: | |
| return None | |
| frames: list[pd.DataFrame] = [] | |
| for shard in shards: | |
| shard_local = hf_hub_download( | |
| repo_id=VOTES_REPO_ID, | |
| filename=shard, | |
| repo_type=VOTES_REPO_TYPE, | |
| token=token, | |
| ) | |
| frames.append(pd.read_parquet(shard_local)) | |
| combined = pd.concat(frames, ignore_index=True, sort=False) | |
| output_row_count = len(combined) | |
| assert len(combined) == sum(len(frame) for frame in frames) | |
| compacted_data = combined.to_parquet(index=False) | |
| compacted_path = _new_compacted_shard_path() | |
| api.create_commit( | |
| repo_id=VOTES_REPO_ID, | |
| repo_type=VOTES_REPO_TYPE, | |
| commit_message=f"compact {len(shards)} vote shards", | |
| operations=[ | |
| CommitOperationAdd(path_or_fileobj=compacted_data, path_in_repo=compacted_path), | |
| *(CommitOperationDelete(path_in_repo=shard) for shard in shards), | |
| ], | |
| ) | |
| return len(shards), len(combined), compacted_path | |
| def _main() -> None: | |
| result = compact_votes() | |
| if result is None: | |
| print(f"Nothing to compact.") | |
| return | |
| shard_count, row_count, compacted_path = result | |
| print( | |
| f"Compacted {shard_count} shards into " | |
| f"{VOTES_REPO_ID}/{compacted_path} " | |
| f"with {row_count} rows." | |
| ) | |
| if __name__ == "__main__": | |
| _main() | |