File size: 4,523 Bytes
830a330 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 | #!/usr/bin/env bash
# Download and filter Lichess games by Elo range, output as Parquet.
# Streams zstd-compressed PGN, filters and parses headers, writes
# compressed Parquet incrementally (one row group per flush batch).
#
# Usage:
# bash scripts/extract_lichess.sh <year-month> <min_elo> <max_elo> <output_dir>
#
# Example:
# bash scripts/extract_lichess.sh 2025-01 1800 1900 /dev/shm/lichess
#
# Downloads from https://database.lichess.org/standard/
# Requires: zstd, python3, pyarrow
set -euo pipefail
YEAR_MONTH="${1:?Usage: $0 <year-month> <min_elo> <max_elo> <output_dir>}"
MIN_ELO="${2:?}"
MAX_ELO="${3:?}"
OUTPUT_DIR="${4:?}"
URL="https://database.lichess.org/standard/lichess_db_standard_rated_${YEAR_MONTH}.pgn.zst"
OUTPUT_FILE="${OUTPUT_DIR}/lichess_${YEAR_MONTH}_elo${MIN_ELO}_${MAX_ELO}.parquet"
mkdir -p "$OUTPUT_DIR"
echo "[${YEAR_MONTH}] Downloading and filtering: Elo ${MIN_ELO}-${MAX_ELO}"
echo "[${YEAR_MONTH}] Source: $URL"
echo "[${YEAR_MONTH}] Output: $OUTPUT_FILE"
# Stream: download -> decompress -> parse/filter/write Parquet
curl -sL "$URL" | zstd -d | python3 -c "
import sys
import pyarrow as pa
import pyarrow.parquet as pq
min_elo = ${MIN_ELO}
max_elo = ${MAX_ELO}
month = '${YEAR_MONTH}'
output = '${OUTPUT_FILE}'
FLUSH_EVERY = 50_000 # rows per row group
schema = pa.schema([
('pgn', pa.string()),
('headers', pa.string()),
('white_elo', pa.int16()),
('black_elo', pa.int16()),
('result', pa.string()),
('time_control', pa.string()),
('opening', pa.string()),
('date', pa.string()),
('month', pa.string()),
])
writer = pq.ParquetWriter(output, schema, compression='zstd')
# Accumulator for current batch
batch = {name: [] for name in schema.names}
n_scanned = 0
n_matched = 0
# Current game state
header_lines = []
move_lines = []
headers = {}
in_moves = False
def parse_header(line):
# [Key \"Value\"] -> (key, value)
try:
key = line[1:line.index(' ')]
val = line.split('\"')[1]
return key, val
except (IndexError, ValueError):
return None, None
def flush_game():
global n_matched
white_elo = headers.get('WhiteElo')
black_elo = headers.get('BlackElo')
if white_elo is None or black_elo is None:
return
try:
we = int(white_elo)
be = int(black_elo)
except ValueError:
return
if not (min_elo <= we <= max_elo and min_elo <= be <= max_elo):
return
moves = ' '.join(line.strip() for line in move_lines if line.strip())
header_block = ''.join(header_lines)
n_ply = moves.count('.') # approximate
batch['pgn'].append(moves)
batch['headers'].append(header_block)
batch['white_elo'].append(we)
batch['black_elo'].append(be)
batch['result'].append(headers.get('Result', ''))
batch['time_control'].append(headers.get('TimeControl', ''))
batch['opening'].append(headers.get('Opening', ''))
batch['date'].append(headers.get('UTCDate', headers.get('Date', '')))
batch['month'].append(month)
n_matched += 1
def flush_batch():
if not batch['pgn']:
return
table = pa.table(batch, schema=schema)
writer.write_table(table)
for k in batch:
batch[k] = []
for line in sys.stdin:
if line.startswith('[Event '):
# New game — flush previous
if header_lines or move_lines:
flush_game()
n_scanned += 1
if n_scanned % 1_000_000 == 0:
print(f'[{month}] Scanned {n_scanned:,} games, matched {n_matched:,}',
file=sys.stderr)
if len(batch['pgn']) >= FLUSH_EVERY:
flush_batch()
header_lines = [line]
move_lines = []
headers = {}
in_moves = False
key, val = parse_header(line)
if key:
headers[key] = val
elif line.startswith('['):
header_lines.append(line)
key, val = parse_header(line)
if key:
headers[key] = val
in_moves = False
elif line.strip() == '':
if header_lines and not in_moves:
in_moves = True
else:
in_moves = True
move_lines.append(line)
# Flush final game and remaining batch
if header_lines or move_lines:
flush_game()
n_scanned += 1
flush_batch()
writer.close()
print(f'[{month}] Done: scanned {n_scanned:,} games, matched {n_matched:,}', file=sys.stderr)
"
file_size=$(du -h "$OUTPUT_FILE" | cut -f1)
echo "[${YEAR_MONTH}] Output: ${file_size} (${OUTPUT_FILE})"
|