e6-visual-ratings / compact_logs.py
RedHotTensors's picture
Clean up compact_logs.py
3bf52de
#!/usr/bin/env python3
from __future__ import annotations
import os
import time
import uuid
import pandas as pd
from huggingface_hub import CommitOperationAdd, CommitOperationDelete, HfApi, hf_hub_download
VOTES_REPO_ID = "taigasan/e6-visual-ratings"
VOTES_REPO_TYPE = "dataset"
VOTES_LOG_SUBDIR = "ratings_log"
RATINGS_APP_TOKEN_ENV = "RATINGS_APP_TOKEN"
def _list_vote_shards(api: HfApi) -> list[str]:
files = api.list_repo_files(repo_id=VOTES_REPO_ID, repo_type=VOTES_REPO_TYPE)
shard_prefix = f"{VOTES_LOG_SUBDIR}/votes_"
return sorted(
f
for f in files
if f.startswith(shard_prefix)
and f.endswith(".parquet")
)
def _new_compacted_shard_path() -> str:
ts = int(time.time())
return f"{VOTES_LOG_SUBDIR}/votes_{ts}_{uuid.uuid4().hex}.parquet"
def compact_votes() -> tuple[int, int, str] | None:
token = os.getenv(RATINGS_APP_TOKEN_ENV)
api = HfApi(token=token)
shards = _list_vote_shards(api)
if len(shards) < 2:
return None
frames: list[pd.DataFrame] = []
for shard in shards:
shard_local = hf_hub_download(
repo_id=VOTES_REPO_ID,
filename=shard,
repo_type=VOTES_REPO_TYPE,
token=token,
)
frames.append(pd.read_parquet(shard_local))
combined = pd.concat(frames, ignore_index=True, sort=False)
output_row_count = len(combined)
assert len(combined) == sum(len(frame) for frame in frames)
compacted_data = combined.to_parquet(index=False)
compacted_path = _new_compacted_shard_path()
api.create_commit(
repo_id=VOTES_REPO_ID,
repo_type=VOTES_REPO_TYPE,
commit_message=f"compact {len(shards)} vote shards",
operations=[
CommitOperationAdd(path_or_fileobj=compacted_data, path_in_repo=compacted_path),
*(CommitOperationDelete(path_in_repo=shard) for shard in shards),
],
)
return len(shards), len(combined), compacted_path
def _main() -> None:
result = compact_votes()
if result is None:
print(f"Nothing to compact.")
return
shard_count, row_count, compacted_path = result
print(
f"Compacted {shard_count} shards into "
f"{VOTES_REPO_ID}/{compacted_path} "
f"with {row_count} rows."
)
if __name__ == "__main__":
_main()