File size: 2,995 Bytes
dbf7313
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
from __future__ import annotations

from pathlib import Path

from slop_farmer.config import SnapshotAdoptOptions
from slop_farmer.data.parquet_io import read_json, read_parquet_rows, write_json

__all__ = ["adopt_snapshot_for_pipeline"]


def adopt_snapshot_for_pipeline(options: SnapshotAdoptOptions) -> Path:
    snapshot_dir = options.snapshot_dir.resolve()
    manifest_path = snapshot_dir / "manifest.json"
    manifest = read_json(manifest_path)
    repo = str(manifest.get("repo") or "")
    if not repo:
        raise ValueError(f"Snapshot manifest has no repo: {manifest_path}")
    snapshot_id = str(manifest.get("snapshot_id") or snapshot_dir.name)
    manifest_watermark = (
        manifest.get("watermark") if isinstance(manifest.get("watermark"), dict) else {}
    )
    next_since = (
        options.next_since
        or manifest_watermark.get("next_since")
        or manifest.get("crawl_started_at")
        or _infer_snapshot_next_since(snapshot_dir, manifest)
        or manifest.get("extracted_at")
    )
    if not isinstance(next_since, str) or not next_since:
        raise ValueError(f"Could not determine next_since from {manifest_path}")

    output_dir = options.output_dir.resolve()
    latest_path = output_dir / "snapshots" / "latest.json"
    watermark_path = output_dir / "state" / "watermark.json"
    write_json(
        {
            "repo": repo,
            "latest_snapshot_id": snapshot_id,
            "snapshot_dir": str(snapshot_dir),
            "manifest_path": str(manifest_path),
            "next_since": next_since,
        },
        latest_path,
    )
    write_json(
        {
            "repo": repo,
            "last_successful_snapshot_id": snapshot_id,
            "snapshot_dir": str(snapshot_dir),
            "effective_since": manifest_watermark.get("effective_since"),
            "next_since": next_since,
            "updated_at": manifest.get("imported_at") or manifest.get("extracted_at") or next_since,
        },
        watermark_path,
    )
    return latest_path


def _infer_snapshot_next_since(snapshot_dir: Path, manifest: dict[str, object]) -> str | None:
    if manifest.get("source_type") != "hf_checkpoint_import":
        return None
    table_specs = (
        ("pull_requests.parquet", ("updated_at", "created_at")),
        ("issues.parquet", ("updated_at", "created_at")),
        ("comments.parquet", ("updated_at", "created_at")),
        ("reviews.parquet", ("submitted_at",)),
        ("review_comments.parquet", ("updated_at", "created_at")),
        ("events.parquet", ("created_at",)),
    )
    for filename, fields in table_specs:
        candidates: list[str] = []
        for row in read_parquet_rows(snapshot_dir / filename):
            for field in fields:
                value = row.get(field)
                if isinstance(value, str) and value:
                    candidates.append(value)
                    break
        if candidates:
            return max(candidates)
    return None