Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| from __future__ import annotations | |
| import os | |
| import time | |
| import uuid | |
| from pathlib import Path | |
| from tempfile import NamedTemporaryFile | |
| import pandas as pd | |
| from huggingface_hub import CommitOperationAdd, CommitOperationDelete, HfApi, hf_hub_download | |
| VOTES_REPO_ID = "taigasan/e6-visual-ratings" | |
| VOTES_REPO_TYPE = "dataset" | |
| VOTES_LOG_SUBDIR = "ratings_log" | |
| RATINGS_APP_TOKEN_ENV = "RATINGS_APP_TOKEN" | |
| def _list_vote_shards(api: HfApi) -> list[str]: | |
| files = api.list_repo_files(repo_id=VOTES_REPO_ID, repo_type=VOTES_REPO_TYPE) | |
| shard_prefix = f"{VOTES_LOG_SUBDIR}/votes_" | |
| return sorted( | |
| f | |
| for f in files | |
| if f.startswith(shard_prefix) | |
| and f.endswith(".parquet") | |
| ) | |
| def _new_compacted_shard_path() -> str: | |
| ts = int(time.time()) | |
| return f"{VOTES_LOG_SUBDIR}/votes_{ts}_{uuid.uuid4().hex}.parquet" | |
| def compact_votes() -> tuple[int, int, str]: | |
| token = os.getenv(RATINGS_APP_TOKEN_ENV) | |
| api = HfApi(token=token) | |
| shards = _list_vote_shards(api) | |
| if not shards: | |
| raise FileNotFoundError(f"No vote shards found in {VOTES_REPO_ID}/{VOTES_LOG_SUBDIR}") | |
| frames = [] | |
| for shard in shards: | |
| shard_local = hf_hub_download( | |
| repo_id=VOTES_REPO_ID, | |
| filename=shard, | |
| repo_type=VOTES_REPO_TYPE, | |
| token=token, | |
| ) | |
| frame = pd.read_parquet(shard_local) | |
| frames.append(frame) | |
| input_row_count = sum(len(frame) for frame in frames) | |
| combined = pd.concat(frames, ignore_index=True, sort=False) | |
| output_row_count = int(len(combined)) | |
| if output_row_count != input_row_count: | |
| raise RuntimeError( | |
| f"Refusing to commit: row mismatch during compaction " | |
| f"({input_row_count} -> {output_row_count})." | |
| ) | |
| with NamedTemporaryFile(suffix=".parquet", delete=False) as tmp: | |
| tmp_path = Path(tmp.name) | |
| try: | |
| combined.to_parquet(tmp_path, index=False) | |
| compacted_path = _new_compacted_shard_path() | |
| operations = [ | |
| CommitOperationAdd(path_or_fileobj=str(tmp_path), path_in_repo=compacted_path), | |
| *[CommitOperationDelete(path_in_repo=shard) for shard in shards], | |
| ] | |
| api.create_commit( | |
| repo_id=VOTES_REPO_ID, | |
| repo_type=VOTES_REPO_TYPE, | |
| commit_message=f"compact {len(shards)} vote shard(s)", | |
| operations=operations, | |
| ) | |
| finally: | |
| tmp_path.unlink(missing_ok=True) | |
| return len(shards), output_row_count, compacted_path | |
| def main() -> None: | |
| shard_count, row_count, compacted_path = compact_votes() | |
| print( | |
| f"Compacted {shard_count} shard(s) into " | |
| f"{VOTES_REPO_ID}/{compacted_path} " | |
| f"with {row_count} rows." | |
| ) | |
| if __name__ == "__main__": | |
| main() |