e6-visual-ratings / compact_logs.py
taigasan's picture
Change sampling
3d2cda1
raw
history blame
2.82 kB
#!/usr/bin/env python3
from __future__ import annotations
import os
import time
import uuid
from pathlib import Path
from tempfile import NamedTemporaryFile
import pandas as pd
from huggingface_hub import CommitOperationAdd, CommitOperationDelete, HfApi, hf_hub_download
VOTES_REPO_ID = "taigasan/e6-visual-ratings"
VOTES_REPO_TYPE = "dataset"
VOTES_LOG_SUBDIR = "ratings_log"
RATINGS_APP_TOKEN_ENV = "RATINGS_APP_TOKEN"
def _list_vote_shards(api: HfApi) -> list[str]:
files = api.list_repo_files(repo_id=VOTES_REPO_ID, repo_type=VOTES_REPO_TYPE)
shard_prefix = f"{VOTES_LOG_SUBDIR}/votes_"
return sorted(
f
for f in files
if f.startswith(shard_prefix)
and f.endswith(".parquet")
)
def _new_compacted_shard_path() -> str:
ts = int(time.time())
return f"{VOTES_LOG_SUBDIR}/votes_{ts}_{uuid.uuid4().hex}.parquet"
def compact_votes() -> tuple[int, int, str]:
token = os.getenv(RATINGS_APP_TOKEN_ENV)
api = HfApi(token=token)
shards = _list_vote_shards(api)
if not shards:
raise FileNotFoundError(f"No vote shards found in {VOTES_REPO_ID}/{VOTES_LOG_SUBDIR}")
frames = []
for shard in shards:
shard_local = hf_hub_download(
repo_id=VOTES_REPO_ID,
filename=shard,
repo_type=VOTES_REPO_TYPE,
token=token,
)
frame = pd.read_parquet(shard_local)
frames.append(frame)
input_row_count = sum(len(frame) for frame in frames)
combined = pd.concat(frames, ignore_index=True, sort=False)
output_row_count = int(len(combined))
if output_row_count != input_row_count:
raise RuntimeError(
f"Refusing to commit: row mismatch during compaction "
f"({input_row_count} -> {output_row_count})."
)
with NamedTemporaryFile(suffix=".parquet", delete=False) as tmp:
tmp_path = Path(tmp.name)
try:
combined.to_parquet(tmp_path, index=False)
compacted_path = _new_compacted_shard_path()
operations = [
CommitOperationAdd(path_or_fileobj=str(tmp_path), path_in_repo=compacted_path),
*[CommitOperationDelete(path_in_repo=shard) for shard in shards],
]
api.create_commit(
repo_id=VOTES_REPO_ID,
repo_type=VOTES_REPO_TYPE,
commit_message=f"compact {len(shards)} vote shard(s)",
operations=operations,
)
finally:
tmp_path.unlink(missing_ok=True)
return len(shards), output_row_count, compacted_path
def main() -> None:
shard_count, row_count, compacted_path = compact_votes()
print(
f"Compacted {shard_count} shard(s) into "
f"{VOTES_REPO_ID}/{compacted_path} "
f"with {row_count} rows."
)
if __name__ == "__main__":
main()