pepijn223 HF Staff commited on
Commit
0ec6cb5
·
unverified ·
1 Parent(s): 08cc322

memory cap and cache cleanup

Browse files
src/app/[org]/[dataset]/[episode]/fetch-data.ts CHANGED
@@ -105,6 +105,81 @@ type ColumnDef = {
105
  value: string[];
106
  };
107
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
108
  export async function getEpisodeData(
109
  org: string,
110
  dataset: string,
@@ -328,7 +403,16 @@ async function getEpisodeDataV2(
328
  );
329
 
330
  const arrayBuffer = await fetchParquetFile(parquetUrl);
331
- const allData = await readParquetAsObjects(arrayBuffer, []);
 
 
 
 
 
 
 
 
 
332
 
333
  // Extract task from language_instruction fields, task field, or tasks.jsonl
334
  let task: string | undefined;
@@ -410,6 +494,7 @@ async function getEpisodeDataV2(
410
  }
411
  return obj;
412
  });
 
413
 
414
  // List of columns that are ignored (e.g., 2D or 3D data)
415
  const ignoredColumns = Object.entries(info.features)
@@ -420,12 +505,15 @@ async function getEpisodeDataV2(
420
  .map(([key]) => key);
421
 
422
  // Process chart data into organized groups using utility function
423
- const chartGroups = processChartDataGroups(seriesNames, chartData);
424
 
425
- const duration = chartData[chartData.length - 1].timestamp;
 
 
 
426
 
427
  const chartDataGroups = chartGroups.map((group) =>
428
- chartData.map((row) => {
429
  const grouped = groupRowBySuffix(pick(row, [...group, "timestamp"]));
430
  // Ensure timestamp is always a number at the top level
431
  return {
@@ -441,7 +529,7 @@ async function getEpisodeDataV2(
441
  episodeId,
442
  videosInfo,
443
  chartDataGroups,
444
- flatChartData: chartData,
445
  episodes,
446
  ignoredColumns,
447
  duration,
@@ -526,7 +614,37 @@ async function loadEpisodeDataV3(
526
  try {
527
  const dataUrl = buildVersionedUrl(repoId, version, dataPath);
528
  const arrayBuffer = await fetchParquetFile(dataUrl);
529
- const fullData = await readParquetAsObjects(arrayBuffer, []);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
530
 
531
  // Extract the episode-specific data slice
532
  const fromIndex = bigIntToNumber(episodeMetadata.dataset_from_index, 0);
@@ -546,7 +664,12 @@ async function loadEpisodeDataV3(
546
  const localFromIndex = Math.max(0, fromIndex - fileStartIndex);
547
  const localToIndex = Math.min(fullData.length, toIndex - fileStartIndex);
548
 
549
- const episodeData = fullData.slice(localFromIndex, localToIndex);
 
 
 
 
 
550
 
551
  if (episodeData.length === 0) {
552
  return {
@@ -1258,6 +1381,10 @@ export async function loadAllEpisodeFrameInfo(
1258
  const cameras = videoFeatures.map(([key]) => key);
1259
  const framesByCamera: Record<string, EpisodeFrameInfo[]> = {};
1260
  for (const cam of cameras) framesByCamera[cam] = [];
 
 
 
 
1261
 
1262
  if (version === "v3.0") {
1263
  let fileIndex = 0;
@@ -1271,6 +1398,7 @@ export async function loadAllEpisodeFrameInfo(
1271
  if (rows.length === 0 && fileIndex > 0) break;
1272
  for (const row of rows) {
1273
  const epIdx = Number(row["episode_index"] ?? 0);
 
1274
  for (const cam of cameras) {
1275
  const cIdx = Number(
1276
  row[`videos/${cam}/chunk_index`] ?? row["video_chunk_index"] ?? 0,
@@ -1307,6 +1435,7 @@ export async function loadAllEpisodeFrameInfo(
1307
 
1308
  // v2.x — construct URLs from template
1309
  for (let i = 0; i < info.total_episodes; i++) {
 
1310
  const chunk = Math.floor(i / (info.chunks_size || 1000));
1311
  for (const cam of cameras) {
1312
  const videoPath = formatStringWithVars(info.video_path, {
@@ -1391,9 +1520,10 @@ export async function loadCrossEpisodeActionVariance(
1391
  version: string,
1392
  info: DatasetMetadata,
1393
  fps: number,
1394
- maxEpisodes = 500,
1395
  numTimeBins = 50,
1396
  ): Promise<CrossEpisodeVarianceData | null> {
 
1397
  const actionEntry = Object.entries(info.features).find(
1398
  ([key, f]) => key === "action" && f.shape.length === 1,
1399
  );
@@ -1476,17 +1606,21 @@ export async function loadCrossEpisodeActionVariance(
1476
  return null;
1477
  }
1478
  console.log(
1479
- `[cross-ep] Found ${allEps.length} episodes in metadata, sampling up to ${maxEpisodes}`,
1480
  );
1481
 
1482
  // Sample episodes evenly
1483
  const sampled =
1484
- allEps.length <= maxEpisodes
1485
  ? allEps
1486
  : Array.from(
1487
- { length: maxEpisodes },
1488
  (_, i) =>
1489
- allEps[Math.round((i * (allEps.length - 1)) / (maxEpisodes - 1))],
 
 
 
 
1490
  );
1491
 
1492
  // Load action (and state) data per episode
@@ -1508,7 +1642,10 @@ export async function loadCrossEpisodeActionVariance(
1508
  const buf = await fetchParquetFile(
1509
  buildVersionedUrl(repoId, version, dataPath),
1510
  );
1511
- const rows = await readParquetAsObjects(buf, []);
 
 
 
1512
  const fileStart =
1513
  rows.length > 0 && rows[0].index !== undefined
1514
  ? Number(rows[0].index)
@@ -1528,9 +1665,18 @@ export async function loadCrossEpisodeActionVariance(
1528
  }
1529
  }
1530
  if (actions.length > 0) {
1531
- episodeActions.push({ index: ep.index, actions });
 
 
 
 
 
 
 
 
 
1532
  episodeStates.push(
1533
- stateKey && states.length === actions.length ? states : null,
1534
  );
1535
  }
1536
  }
@@ -1550,7 +1696,10 @@ export async function loadCrossEpisodeActionVariance(
1550
  const buf = await fetchParquetFile(
1551
  buildVersionedUrl(repoId, version, dataPath),
1552
  );
1553
- const rows = await readParquetAsObjects(buf, []);
 
 
 
1554
  const actions: number[][] = [];
1555
  const states: number[][] = [];
1556
  for (const row of rows) {
@@ -1571,9 +1720,18 @@ export async function loadCrossEpisodeActionVariance(
1571
  }
1572
  }
1573
  if (actions.length > 0) {
1574
- episodeActions.push({ index: ep.index, actions });
 
 
 
 
 
 
 
 
 
1575
  episodeStates.push(
1576
- stateKey && states.length === actions.length ? states : null,
1577
  );
1578
  }
1579
  } catch {
 
105
  value: string[];
106
  };
107
 
108
+ function parsePositiveIntEnv(
109
+ value: string | undefined,
110
+ fallback: number,
111
+ min = 1,
112
+ ): number {
113
+ const parsed = Number.parseInt(value ?? "", 10);
114
+ return Number.isFinite(parsed) && parsed >= min ? parsed : fallback;
115
+ }
116
+
117
+ const MAX_EPISODE_POINTS = parsePositiveIntEnv(
118
+ process.env.MAX_EPISODE_POINTS,
119
+ 4000,
120
+ 100,
121
+ );
122
+ const MAX_FRAMES_OVERVIEW_EPISODES = parsePositiveIntEnv(
123
+ process.env.MAX_FRAMES_OVERVIEW_EPISODES,
124
+ 3000,
125
+ 100,
126
+ );
127
+ const MAX_CROSS_EPISODE_SAMPLE = parsePositiveIntEnv(
128
+ process.env.MAX_CROSS_EPISODE_SAMPLE,
129
+ 120,
130
+ 10,
131
+ );
132
+ const MAX_CROSS_EPISODE_FRAMES_PER_EPISODE = parsePositiveIntEnv(
133
+ process.env.MAX_CROSS_EPISODE_FRAMES_PER_EPISODE,
134
+ 2500,
135
+ 100,
136
+ );
137
+
138
+ function evenlySampleIndices(length: number, target: number): number[] {
139
+ if (length <= 0) return [];
140
+ if (target >= length) return Array.from({ length }, (_, i) => i);
141
+ if (target <= 1) return [0];
142
+
143
+ const sampled = new Set<number>();
144
+ for (let i = 0; i < target; i++) {
145
+ sampled.add(Math.round((i * (length - 1)) / (target - 1)));
146
+ }
147
+
148
+ // Fill potential gaps caused by rounding collisions.
149
+ if (sampled.size < target) {
150
+ for (let i = 0; i < length && sampled.size < target; i++) {
151
+ sampled.add(i);
152
+ }
153
+ }
154
+
155
+ return Array.from(sampled).sort((a, b) => a - b);
156
+ }
157
+
158
+ function evenlySampleArray<T>(items: T[], maxCount: number): T[] {
159
+ if (items.length <= maxCount) return items;
160
+ return evenlySampleIndices(items.length, maxCount).map((idx) => items[idx]);
161
+ }
162
+
163
+ function sampleRowsFromRange(
164
+ rows: Record<string, unknown>[],
165
+ from: number,
166
+ to: number,
167
+ maxCount: number,
168
+ ): Record<string, unknown>[] {
169
+ const length = Math.max(0, to - from);
170
+ if (length === 0) return [];
171
+ if (length <= maxCount) return rows.slice(from, to);
172
+ return evenlySampleIndices(length, maxCount).map((idx) => rows[from + idx]);
173
+ }
174
+
175
+ function buildSampledEpisodeSet(
176
+ totalEpisodes: number,
177
+ maxEpisodes: number,
178
+ ): Set<number> | null {
179
+ if (totalEpisodes <= maxEpisodes) return null;
180
+ return new Set(evenlySampleIndices(totalEpisodes, maxEpisodes));
181
+ }
182
+
183
  export async function getEpisodeData(
184
  org: string,
185
  dataset: string,
 
403
  );
404
 
405
  const arrayBuffer = await fetchParquetFile(parquetUrl);
406
+ const parquetColumns = Array.from(
407
+ new Set([
408
+ "timestamp",
409
+ "task",
410
+ "task_index",
411
+ "language_instruction",
412
+ ...filteredColumns.map((c) => c.key),
413
+ ]),
414
+ );
415
+ const allData = await readParquetAsObjects(arrayBuffer, parquetColumns);
416
 
417
  // Extract task from language_instruction fields, task field, or tasks.jsonl
418
  let task: string | undefined;
 
494
  }
495
  return obj;
496
  });
497
+ const sampledChartData = evenlySampleArray(chartData, MAX_EPISODE_POINTS);
498
 
499
  // List of columns that are ignored (e.g., 2D or 3D data)
500
  const ignoredColumns = Object.entries(info.features)
 
505
  .map(([key]) => key);
506
 
507
  // Process chart data into organized groups using utility function
508
+ const chartGroups = processChartDataGroups(seriesNames, sampledChartData);
509
 
510
+ const duration =
511
+ sampledChartData.length > 0
512
+ ? sampledChartData[sampledChartData.length - 1].timestamp
513
+ : 0;
514
 
515
  const chartDataGroups = chartGroups.map((group) =>
516
+ sampledChartData.map((row) => {
517
  const grouped = groupRowBySuffix(pick(row, [...group, "timestamp"]));
518
  // Ensure timestamp is always a number at the top level
519
  return {
 
529
  episodeId,
530
  videosInfo,
531
  chartDataGroups,
532
+ flatChartData: sampledChartData,
533
  episodes,
534
  ignoredColumns,
535
  duration,
 
614
  try {
615
  const dataUrl = buildVersionedUrl(repoId, version, dataPath);
616
  const arrayBuffer = await fetchParquetFile(dataUrl);
617
+ const v3DataColumns = Array.from(
618
+ new Set([
619
+ "index",
620
+ "timestamp",
621
+ "task_index",
622
+ "language_instruction",
623
+ "language_instruction_2",
624
+ "language_instruction_3",
625
+ ...Object.entries(info.features)
626
+ .filter(([, feature]) => {
627
+ const dtype = feature.dtype.toLowerCase();
628
+ const isNumericOrBool = [
629
+ "float32",
630
+ "float64",
631
+ "int8",
632
+ "int16",
633
+ "int32",
634
+ "int64",
635
+ "uint8",
636
+ "uint16",
637
+ "uint32",
638
+ "uint64",
639
+ "bool",
640
+ "boolean",
641
+ ].includes(dtype);
642
+ return isNumericOrBool && feature.shape.length <= 1;
643
+ })
644
+ .map(([key]) => key),
645
+ ]),
646
+ );
647
+ const fullData = await readParquetAsObjects(arrayBuffer, v3DataColumns);
648
 
649
  // Extract the episode-specific data slice
650
  const fromIndex = bigIntToNumber(episodeMetadata.dataset_from_index, 0);
 
664
  const localFromIndex = Math.max(0, fromIndex - fileStartIndex);
665
  const localToIndex = Math.min(fullData.length, toIndex - fileStartIndex);
666
 
667
+ const episodeData = sampleRowsFromRange(
668
+ fullData,
669
+ localFromIndex,
670
+ localToIndex,
671
+ MAX_EPISODE_POINTS,
672
+ );
673
 
674
  if (episodeData.length === 0) {
675
  return {
 
1381
  const cameras = videoFeatures.map(([key]) => key);
1382
  const framesByCamera: Record<string, EpisodeFrameInfo[]> = {};
1383
  for (const cam of cameras) framesByCamera[cam] = [];
1384
+ const sampledEpisodeSet = buildSampledEpisodeSet(
1385
+ info.total_episodes,
1386
+ MAX_FRAMES_OVERVIEW_EPISODES,
1387
+ );
1388
 
1389
  if (version === "v3.0") {
1390
  let fileIndex = 0;
 
1398
  if (rows.length === 0 && fileIndex > 0) break;
1399
  for (const row of rows) {
1400
  const epIdx = Number(row["episode_index"] ?? 0);
1401
+ if (sampledEpisodeSet && !sampledEpisodeSet.has(epIdx)) continue;
1402
  for (const cam of cameras) {
1403
  const cIdx = Number(
1404
  row[`videos/${cam}/chunk_index`] ?? row["video_chunk_index"] ?? 0,
 
1435
 
1436
  // v2.x — construct URLs from template
1437
  for (let i = 0; i < info.total_episodes; i++) {
1438
+ if (sampledEpisodeSet && !sampledEpisodeSet.has(i)) continue;
1439
  const chunk = Math.floor(i / (info.chunks_size || 1000));
1440
  for (const cam of cameras) {
1441
  const videoPath = formatStringWithVars(info.video_path, {
 
1520
  version: string,
1521
  info: DatasetMetadata,
1522
  fps: number,
1523
+ maxEpisodes = MAX_CROSS_EPISODE_SAMPLE,
1524
  numTimeBins = 50,
1525
  ): Promise<CrossEpisodeVarianceData | null> {
1526
+ const cappedMaxEpisodes = Math.min(maxEpisodes, MAX_CROSS_EPISODE_SAMPLE);
1527
  const actionEntry = Object.entries(info.features).find(
1528
  ([key, f]) => key === "action" && f.shape.length === 1,
1529
  );
 
1606
  return null;
1607
  }
1608
  console.log(
1609
+ `[cross-ep] Found ${allEps.length} episodes in metadata, sampling up to ${cappedMaxEpisodes}`,
1610
  );
1611
 
1612
  // Sample episodes evenly
1613
  const sampled =
1614
+ allEps.length <= cappedMaxEpisodes
1615
  ? allEps
1616
  : Array.from(
1617
+ { length: cappedMaxEpisodes },
1618
  (_, i) =>
1619
+ allEps[
1620
+ Math.round(
1621
+ (i * (allEps.length - 1)) / (cappedMaxEpisodes - 1),
1622
+ )
1623
+ ],
1624
  );
1625
 
1626
  // Load action (and state) data per episode
 
1642
  const buf = await fetchParquetFile(
1643
  buildVersionedUrl(repoId, version, dataPath),
1644
  );
1645
+ const rows = await readParquetAsObjects(
1646
+ buf,
1647
+ stateKey ? ["index", actionKey, stateKey] : ["index", actionKey],
1648
+ );
1649
  const fileStart =
1650
  rows.length > 0 && rows[0].index !== undefined
1651
  ? Number(rows[0].index)
 
1665
  }
1666
  }
1667
  if (actions.length > 0) {
1668
+ const sampledIndices = evenlySampleIndices(
1669
+ actions.length,
1670
+ Math.min(actions.length, MAX_CROSS_EPISODE_FRAMES_PER_EPISODE),
1671
+ );
1672
+ const sampledActions = sampledIndices.map((i) => actions[i]);
1673
+ const sampledStates =
1674
+ stateKey && states.length === actions.length
1675
+ ? sampledIndices.map((i) => states[i])
1676
+ : null;
1677
+ episodeActions.push({ index: ep.index, actions: sampledActions });
1678
  episodeStates.push(
1679
+ stateKey ? sampledStates : null,
1680
  );
1681
  }
1682
  }
 
1696
  const buf = await fetchParquetFile(
1697
  buildVersionedUrl(repoId, version, dataPath),
1698
  );
1699
+ const rows = await readParquetAsObjects(
1700
+ buf,
1701
+ stateKey ? [actionKey, stateKey] : [actionKey],
1702
+ );
1703
  const actions: number[][] = [];
1704
  const states: number[][] = [];
1705
  for (const row of rows) {
 
1720
  }
1721
  }
1722
  if (actions.length > 0) {
1723
+ const sampledIndices = evenlySampleIndices(
1724
+ actions.length,
1725
+ Math.min(actions.length, MAX_CROSS_EPISODE_FRAMES_PER_EPISODE),
1726
+ );
1727
+ const sampledActions = sampledIndices.map((i) => actions[i]);
1728
+ const sampledStates =
1729
+ stateKey && states.length === actions.length
1730
+ ? sampledIndices.map((i) => states[i])
1731
+ : null;
1732
+ episodeActions.push({ index: ep.index, actions: sampledActions });
1733
  episodeStates.push(
1734
+ stateKey ? sampledStates : null,
1735
  );
1736
  }
1737
  } catch {
src/utils/versionUtils.ts CHANGED
@@ -37,11 +37,36 @@ const datasetInfoCache = new Map<
37
  { data: DatasetInfo; expiry: number }
38
  >();
39
  const CACHE_TTL_MS = 5 * 60 * 1000;
40
- const CACHE_MAX_SIZE = 200;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
41
 
42
  export async function getDatasetInfo(repoId: string): Promise<DatasetInfo> {
 
 
 
43
  const cached = datasetInfoCache.get(repoId);
44
- if (cached && Date.now() < cached.expiry) {
 
 
 
45
  console.log(`[perf] getDatasetInfo cache HIT for ${repoId}`);
46
  return cached.data;
47
  }
@@ -81,6 +106,7 @@ export async function getDatasetInfo(repoId: string): Promise<DatasetInfo> {
81
  data: data as DatasetInfo,
82
  expiry: Date.now() + CACHE_TTL_MS,
83
  });
 
84
  return data as DatasetInfo;
85
  } catch (error) {
86
  if (error instanceof Error) {
 
37
  { data: DatasetInfo; expiry: number }
38
  >();
39
  const CACHE_TTL_MS = 5 * 60 * 1000;
40
+ const MAX_CACHE_ENTRIES = Math.max(
41
+ 8,
42
+ parseInt(process.env.MAX_DATASET_INFO_CACHE_ENTRIES ?? "64", 10) || 64,
43
+ );
44
+
45
+ function pruneDatasetInfoCache(now: number) {
46
+ // Remove expired entries first.
47
+ for (const [key, value] of datasetInfoCache) {
48
+ if (now >= value.expiry) {
49
+ datasetInfoCache.delete(key);
50
+ }
51
+ }
52
+
53
+ // Then cap overall cache size to prevent unbounded growth.
54
+ while (datasetInfoCache.size > MAX_CACHE_ENTRIES) {
55
+ const oldestKey = datasetInfoCache.keys().next().value;
56
+ if (!oldestKey) break;
57
+ datasetInfoCache.delete(oldestKey);
58
+ }
59
+ }
60
 
61
  export async function getDatasetInfo(repoId: string): Promise<DatasetInfo> {
62
+ const now = Date.now();
63
+ pruneDatasetInfoCache(now);
64
+
65
  const cached = datasetInfoCache.get(repoId);
66
+ if (cached && now < cached.expiry) {
67
+ // Keep insertion order fresh so the cache behaves closer to LRU.
68
+ datasetInfoCache.delete(repoId);
69
+ datasetInfoCache.set(repoId, cached);
70
  console.log(`[perf] getDatasetInfo cache HIT for ${repoId}`);
71
  return cached.data;
72
  }
 
106
  data: data as DatasetInfo,
107
  expiry: Date.now() + CACHE_TTL_MS,
108
  });
109
+ pruneDatasetInfoCache(Date.now());
110
  return data as DatasetInfo;
111
  } catch (error) {
112
  if (error instanceof Error) {