File size: 7,629 Bytes
7845b2b 9b6bcf4 7845b2b 6bf31b2 7845b2b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 | import os
import json
import time
import socket
import threading
import io
import requests
import pandas as pd
from pathlib import Path
from tokenizers import Tokenizer
from huggingface_hub import HfApi
# ββ Config βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
HF_TOKEN = os.environ.get("HF_TOKEN")
DATASET_REPO = "Neon-coding/github-code-raw"
TOK_PATH = "/data/tokenizer.json"
OUT_DIR = "/data/by-language"
STATE_FILE = "/data/progress_state.json"
TOTAL_PARQUETS = 880
SHARD_TOKENS = 50_000_000 # 50M tokens per shard
PARQUET_URL = (
"https://huggingface.co/datasets/codeparrot/github-code-clean"
"/resolve/main/data/train-{i:05d}-of-00880.parquet"
)
os.makedirs(OUT_DIR, exist_ok=True)
api = HfApi(token=HF_TOKEN)
# ββ Port 7860 β keeps Space green ββββββββββββββββββββββββββββββββββββββββββββ
def serve():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("0.0.0.0", 7860))
s.listen(5)
print("β Listening on port 7860")
while True:
conn, _ = s.accept()
conn.send(b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK")
conn.close()
# ββ State ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def load_state():
if os.path.exists(STATE_FILE):
with open(STATE_FILE) as f:
state = json.load(f)
print(f"Resuming β {len(state['done'])} / {TOTAL_PARQUETS} parquets done")
else:
state = {
"done": [],
"lang_shards": {},
"lang_tokens": {},
}
print("Starting fresh")
return state
def save_state(state, retries=3, delay=5):
for attempt in range(retries):
try:
with open(STATE_FILE, "w") as f:
json.dump(state, f, indent=2)
return
except OSError as e:
print(f" β State save attempt {attempt + 1} failed: {e}")
if attempt < retries - 1:
time.sleep(delay)
print(" β State save failed after all retries β continuing")
# ββ Shard buffers β global per language, persist across parquets βββββββββββββ
buffers = {}
def get_buffer(lang):
if lang not in buffers:
buffers[lang] = {"rows": [], "token_count": 0}
return buffers[lang]
def flush_shard(lang, rows, state):
shard_idx = state["lang_shards"].get(lang, 0)
lang_dir = Path(OUT_DIR) / lang
lang_dir.mkdir(parents=True, exist_ok=True)
shard_name = f"shard_{shard_idx:06d}.jsonl"
shard_path = lang_dir / shard_name
with open(shard_path, "w", encoding="utf-8") as f:
for row in rows:
f.write(json.dumps(row, ensure_ascii=False) + "\n")
tok_in_shard = sum(r["token_count"] for r in rows)
state["lang_shards"][lang] = shard_idx + 1
state["lang_tokens"][lang] = state["lang_tokens"].get(lang, 0) + tok_in_shard
print(f" β {lang}/{shard_name} | {len(rows)} samples | {tok_in_shard:,} tokens")
# ββ Main processing loop βββββββββββββββββββββββββββββββββββββββββββββββββββββ
def process(tokenizer, state):
for i in range(TOTAL_PARQUETS):
if i in state["done"]:
print(f"[{i:06d}/{TOTAL_PARQUETS}] SKIP")
continue
url = PARQUET_URL.format(i=i)
print(f"[{i:06d}/{TOTAL_PARQUETS}] Downloading...")
try:
resp = requests.get(
url,
headers={"Authorization": f"Bearer {HF_TOKEN}"},
timeout=180,
)
resp.raise_for_status()
df = pd.read_parquet(io.BytesIO(resp.content))
except Exception as e:
print(f"[{i:06d}] Download error: {e} β skipping")
continue
print(f"[{i:06d}] {len(df):,} rows | {df['language'].nunique()} languages")
# row by row β constant memory
for row_tuple in df.itertuples(index=False):
lang = row_tuple.language
text = row_tuple.code if row_tuple.code else ""
repo = row_tuple.repo_name
fpath = row_tuple.path
lic = row_tuple.license
if not text.strip():
continue
enc = tokenizer.encode(text)
token_count = len(enc.ids)
if token_count < 2:
continue
buf = get_buffer(lang)
row = {
"text": text,
"token_count": token_count,
"repo": repo,
"path": fpath,
"license": lic,
}
if buf["token_count"] + token_count > SHARD_TOKENS and buf["rows"]:
flush_shard(lang, buf["rows"], state)
save_state(state)
buf["rows"] = []
buf["token_count"] = 0
buf["rows"].append(row)
buf["token_count"] += token_count
del df
state["done"].append(i)
save_state(state)
print(f"[{i:06d}] β Complete")
# ββ Flush remaining partial shards ββββββββββββββββββββββββββββββββββββββββ
print("\nFlushing remaining buffers...")
for lang, buf in buffers.items():
if buf["rows"]:
flush_shard(lang, buf["rows"], state)
save_state(state)
# ββ Write meta.json per language ββββββββββββββββββββββββββββββββββββββββββ
print("\nWriting meta.json per language...")
for lang in state["lang_tokens"]:
meta = {
"language": lang,
"total_tokens": state["lang_tokens"][lang],
"total_shards": state["lang_shards"].get(lang, 0),
}
meta_path = Path(OUT_DIR) / lang / "meta.json"
with open(meta_path, "w") as f:
json.dump(meta, f, indent=2)
print(f" {lang}: {meta['total_tokens']:,} tokens | {meta['total_shards']} shards")
# ββ Push everything to HF dataset repo βββββββββββββββββββββββββββββββββββ
print(f"\nPushing to {DATASET_REPO}...")
api.upload_folder(
folder_path=OUT_DIR,
repo_id=DATASET_REPO,
repo_type="dataset",
token=HF_TOKEN,
)
print("\nβ All done!")
# ββ Entry point ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
if __name__ == "__main__":
threading.Thread(target=serve, daemon=True).start()
print("β Loading tokenizer from /data/tokenizer.json...")
tokenizer = Tokenizer.from_file(TOK_PATH)
print(f"β Tokenizer loaded | vocab: {tokenizer.get_vocab_size():,}")
state = load_state()
threading.Thread(target=process, args=(tokenizer, state), daemon=True).start()
while True:
time.sleep(60) |