pepijn223 HF Staff commited on
Commit
1f94f4a
·
unverified ·
1 Parent(s): f32c7fc

load the episode’s row range for v3 datasets (based on the file’s start index

Browse files
src/app/[org]/[dataset]/[episode]/fetch-data.ts CHANGED
@@ -748,7 +748,7 @@ async function loadEpisodeDataV3(
748
 
749
  try {
750
  const dataUrl = buildVersionedUrl(repoId, version, dataPath);
751
- const arrayBuffer = await fetchParquetFile(dataUrl);
752
  const v3DataColumns = Array.from(
753
  new Set([
754
  "index",
@@ -779,32 +779,44 @@ async function loadEpisodeDataV3(
779
  .map(([key]) => key),
780
  ]),
781
  );
782
- const fullData = await readParquetAsObjects(arrayBuffer, v3DataColumns);
783
-
784
  // Extract the episode-specific data slice
785
  const fromIndex = bigIntToNumber(episodeMetadata.dataset_from_index, 0);
786
- const toIndex = bigIntToNumber(
787
- episodeMetadata.dataset_to_index,
788
- fullData.length,
789
- );
790
 
791
- // Find the starting index of this parquet file by checking the first row's index
792
- // This handles the case where episodes are split across multiple parquet files
793
- let fileStartIndex = 0;
794
- if (fullData.length > 0 && fullData[0].index !== undefined) {
795
- fileStartIndex = Number(fullData[0].index);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
796
  }
797
 
798
- // Adjust indices to be relative to this file's starting position
799
- const localFromIndex = Math.max(0, fromIndex - fileStartIndex);
800
- const localToIndex = Math.min(fullData.length, toIndex - fileStartIndex);
801
 
802
- const episodeData = sampleRowsFromRange(
803
- fullData,
804
- localFromIndex,
805
- localToIndex,
806
- MAX_EPISODE_POINTS,
807
- );
808
 
809
  if (episodeData.length === 0) {
810
  return {
 
748
 
749
  try {
750
  const dataUrl = buildVersionedUrl(repoId, version, dataPath);
751
+ const parquetFile = await fetchParquetFile(dataUrl);
752
  const v3DataColumns = Array.from(
753
  new Set([
754
  "index",
 
779
  .map(([key]) => key),
780
  ]),
781
  );
 
 
782
  // Extract the episode-specific data slice
783
  const fromIndex = bigIntToNumber(episodeMetadata.dataset_from_index, 0);
784
+ let toIndex = bigIntToNumber(episodeMetadata.dataset_to_index, fromIndex);
785
+ if (toIndex <= fromIndex) {
786
+ toIndex = fromIndex + 1;
787
+ }
788
 
789
+ let episodeRows: Record<string, unknown>[] = [];
790
+ let usedRowRange = false;
791
+
792
+ try {
793
+ const indexPreview = await readParquetAsObjects(parquetFile, ["index"], {
794
+ rowStart: 0,
795
+ rowEnd: 1,
796
+ });
797
+ const startIndexValue = indexPreview[0]?.index;
798
+ if (startIndexValue !== undefined && startIndexValue !== null) {
799
+ const fileStartIndex =
800
+ typeof startIndexValue === "bigint"
801
+ ? Number(startIndexValue)
802
+ : Number(startIndexValue);
803
+ const localFromIndex = Math.max(0, fromIndex - fileStartIndex);
804
+ const localToIndex = Math.max(localFromIndex, toIndex - fileStartIndex);
805
+ episodeRows = await readParquetAsObjects(parquetFile, v3DataColumns, {
806
+ rowStart: localFromIndex,
807
+ rowEnd: localToIndex,
808
+ });
809
+ usedRowRange = true;
810
+ }
811
+ } catch {
812
+ // Fall back to full reads if row-range selection fails.
813
  }
814
 
815
+ if (!usedRowRange) {
816
+ episodeRows = await readParquetAsObjects(parquetFile, v3DataColumns);
817
+ }
818
 
819
+ const episodeData = evenlySampleArray(episodeRows, MAX_EPISODE_POINTS);
 
 
 
 
 
820
 
821
  if (episodeData.length === 0) {
822
  return {
src/utils/parquetUtils.ts CHANGED
@@ -1,4 +1,10 @@
1
- import { parquetRead, parquetReadObjects } from "hyparquet";
 
 
 
 
 
 
2
 
3
  export interface DatasetMetadata {
4
  codebase_version: string;
@@ -42,26 +48,36 @@ export function formatStringWithVars(
42
  }
43
 
44
  // Fetch and parse the Parquet file
45
- export async function fetchParquetFile(url: string): Promise<ArrayBuffer> {
46
- const res = await fetch(url, { cache: "no-store" });
47
 
48
- if (!res.ok) {
49
- throw new Error(`Failed to fetch ${url}: ${res.status} ${res.statusText}`);
50
- }
51
 
52
- return res.arrayBuffer();
 
 
 
 
 
 
 
 
 
 
53
  }
54
 
55
  // Read specific columns from the Parquet file
56
  export async function readParquetColumn(
57
- fileBuffer: ArrayBuffer,
58
  columns: string[],
 
59
  ): Promise<unknown[][]> {
60
  return new Promise((resolve, reject) => {
61
  try {
62
  parquetRead({
63
  file: fileBuffer,
64
  columns: columns.length > 0 ? columns : undefined,
 
 
65
  onComplete: (data: unknown[][]) => {
66
  resolve(data);
67
  },
@@ -73,12 +89,15 @@ export async function readParquetColumn(
73
  }
74
 
75
  export async function readParquetAsObjects(
76
- fileBuffer: ArrayBuffer,
77
  columns: string[] = [],
 
78
  ): Promise<Record<string, unknown>[]> {
79
  return parquetReadObjects({
80
  file: fileBuffer,
81
  columns: columns.length > 0 ? columns : undefined,
 
 
82
  }) as Promise<Record<string, unknown>[]>;
83
  }
84
 
 
1
+ import {
2
+ asyncBufferFromUrl,
3
+ cachedAsyncBuffer,
4
+ parquetRead,
5
+ parquetReadObjects,
6
+ type AsyncBuffer,
7
+ } from "hyparquet";
8
 
9
  export interface DatasetMetadata {
10
  codebase_version: string;
 
48
  }
49
 
50
  // Fetch and parse the Parquet file
51
+ type ParquetFile = ArrayBuffer | AsyncBuffer;
 
52
 
53
+ const parquetFileCache = new Map<string, AsyncBuffer>();
 
 
54
 
55
+ export async function fetchParquetFile(url: string): Promise<ParquetFile> {
56
+ const cached = parquetFileCache.get(url);
57
+ if (cached) return cached;
58
+
59
+ const file = await asyncBufferFromUrl({
60
+ url,
61
+ requestInit: { cache: "no-store" },
62
+ });
63
+ const wrapped = cachedAsyncBuffer(file);
64
+ parquetFileCache.set(url, wrapped);
65
+ return wrapped;
66
  }
67
 
68
  // Read specific columns from the Parquet file
69
  export async function readParquetColumn(
70
+ fileBuffer: ParquetFile,
71
  columns: string[],
72
+ options?: { rowStart?: number; rowEnd?: number },
73
  ): Promise<unknown[][]> {
74
  return new Promise((resolve, reject) => {
75
  try {
76
  parquetRead({
77
  file: fileBuffer,
78
  columns: columns.length > 0 ? columns : undefined,
79
+ rowStart: options?.rowStart,
80
+ rowEnd: options?.rowEnd,
81
  onComplete: (data: unknown[][]) => {
82
  resolve(data);
83
  },
 
89
  }
90
 
91
  export async function readParquetAsObjects(
92
+ fileBuffer: ParquetFile,
93
  columns: string[] = [],
94
+ options?: { rowStart?: number; rowEnd?: number },
95
  ): Promise<Record<string, unknown>[]> {
96
  return parquetReadObjects({
97
  file: fileBuffer,
98
  columns: columns.length > 0 ? columns : undefined,
99
+ rowStart: options?.rowStart,
100
+ rowEnd: options?.rowEnd,
101
  }) as Promise<Record<string, unknown>[]>;
102
  }
103