File size: 2,354 Bytes
3d2cda1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3bf52de
3d2cda1
 
 
 
3bf52de
 
3d2cda1
3bf52de
3d2cda1
 
 
 
 
 
 
3bf52de
3d2cda1
 
 
3bf52de
 
 
 
 
 
 
 
 
 
 
 
 
 
3d2cda1
3bf52de
3d2cda1
3bf52de
 
 
 
 
3d2cda1
3bf52de
3d2cda1
3bf52de
3d2cda1
 
 
 
 
3bf52de
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#!/usr/bin/env python3
from __future__ import annotations

import os
import time
import uuid

import pandas as pd
from huggingface_hub import CommitOperationAdd, CommitOperationDelete, HfApi, hf_hub_download

VOTES_REPO_ID = "taigasan/e6-visual-ratings"
VOTES_REPO_TYPE = "dataset"
VOTES_LOG_SUBDIR = "ratings_log"
RATINGS_APP_TOKEN_ENV = "RATINGS_APP_TOKEN"

def _list_vote_shards(api: HfApi) -> list[str]:
    files = api.list_repo_files(repo_id=VOTES_REPO_ID, repo_type=VOTES_REPO_TYPE)
    shard_prefix = f"{VOTES_LOG_SUBDIR}/votes_"
    return sorted(
        f
        for f in files
        if f.startswith(shard_prefix)
        and f.endswith(".parquet")
    )

def _new_compacted_shard_path() -> str:
    ts = int(time.time())
    return f"{VOTES_LOG_SUBDIR}/votes_{ts}_{uuid.uuid4().hex}.parquet"

def compact_votes() -> tuple[int, int, str] | None:
    token = os.getenv(RATINGS_APP_TOKEN_ENV)
    api = HfApi(token=token)

    shards = _list_vote_shards(api)
    if len(shards) < 2:
        return None

    frames: list[pd.DataFrame] = []
    for shard in shards:
        shard_local = hf_hub_download(
            repo_id=VOTES_REPO_ID,
            filename=shard,
            repo_type=VOTES_REPO_TYPE,
            token=token,
        )
        frames.append(pd.read_parquet(shard_local))

    combined = pd.concat(frames, ignore_index=True, sort=False)

    output_row_count = len(combined)
    assert len(combined) == sum(len(frame) for frame in frames)

    compacted_data = combined.to_parquet(index=False)
    compacted_path = _new_compacted_shard_path()
    api.create_commit(
        repo_id=VOTES_REPO_ID,
        repo_type=VOTES_REPO_TYPE,
        commit_message=f"compact {len(shards)} vote shards",
        operations=[
            CommitOperationAdd(path_or_fileobj=compacted_data, path_in_repo=compacted_path),
            *(CommitOperationDelete(path_in_repo=shard) for shard in shards),
        ],
    )

    return len(shards), len(combined), compacted_path

def _main() -> None:
    result = compact_votes()
    if result is None:
        print(f"Nothing to compact.")
        return

    shard_count, row_count, compacted_path = result
    print(
        f"Compacted {shard_count} shards into "
        f"{VOTES_REPO_ID}/{compacted_path} "
        f"with {row_count} rows."
    )

if __name__ == "__main__":
    _main()