Neon-tech commited on
Commit
6bf31b2
Β·
verified Β·
1 Parent(s): 9b6bcf4

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +200 -104
app.py CHANGED
@@ -1,118 +1,214 @@
1
- # process.py
2
- from huggingface_hub import snapshot_download, HfApi
3
- from tokenizers import Tokenizer
4
  import os
5
  import json
 
6
  import threading
 
7
  import requests
8
  import pandas as pd
9
  from pathlib import Path
10
- from concurrent.futures import ThreadPoolExecutor
11
-
12
- # ── Config ──
13
- HF_USERNAME = "Neon-coding"
14
- DATASET_NAME = "github-code-raw"
15
- RAW_DIR = "/data/codeparrot-raw"
16
- OUT_DIR = "/data/by-language"
17
- STATE_FILE = "/data/progress_state.json"
18
- TOK_PATH = "/data/tokenizer.json"
 
 
 
 
 
 
 
 
 
19
 
20
  os.makedirs(OUT_DIR, exist_ok=True)
21
- os.makedirs(RAW_DIR, exist_ok=True)
22
-
23
- # ── Load tokenizer (already in bucket) ──
24
- print("βœ“ Loading tokenizer from bucket...")
25
- tokenizer = Tokenizer.from_file(TOK_PATH)
26
- SEP_TOKEN = tokenizer.token_to_id("<eos>")
27
- print(f"βœ“ Tokenizer loaded | vocab: {tokenizer.get_vocab_size():,}")
28
-
29
- # ── Load state ──
30
- if os.path.exists(STATE_FILE):
31
- with open(STATE_FILE) as f:
32
- state = json.load(f)
33
- print(f"Resuming β€” {len(state['processed_files'])} files already done")
34
- else:
35
- state = {"processed_files": [], "lang_tokens": {}}
36
- print("Starting fresh")
37
-
38
- lock = threading.Lock()
39
-
40
- def save_state():
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
41
  with open(STATE_FILE, "w") as f:
42
  json.dump(state, f, indent=2)
43
 
44
- # ── Download codeparrot ──
45
- print("\nDownloading codeparrot/github-code-clean...")
46
- local_dir = snapshot_download(
47
- repo_id="codeparrot/github-code-clean",
48
- repo_type="dataset",
49
- local_dir=RAW_DIR,
50
- )
51
-
52
- parquet_files = sorted(Path(local_dir).rglob("*.parquet"))
53
- print(f"Found {len(parquet_files)} parquet files")
54
-
55
- # ── Process each file ──
56
- def process_file(path):
57
- fname = str(path)
58
-
59
- if fname in state["processed_files"]:
60
- print(f" SKIP {path.name}")
61
- return
62
-
63
- try:
64
- df = pd.read_parquet(path)
65
-
66
- for lang, group in df.groupby("language"):
67
- lang_dir = os.path.join(OUT_DIR, lang)
68
- os.makedirs(lang_dir, exist_ok=True)
69
- out = os.path.join(lang_dir, f"{path.stem}.jsonl")
70
-
71
- if os.path.exists(out):
72
- continue
73
-
74
- texts = group["code"].dropna().tolist()
75
- encoded = tokenizer.encode_batch(texts)
76
- tok_count = sum(len(e.ids) for e in encoded)
77
-
78
- group[["code"]].rename(columns={"code": "text"}).to_json(
79
- out, orient="records", lines=True
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
80
  )
 
 
 
 
 
81
 
82
- with lock:
83
- state["lang_tokens"][lang] = state["lang_tokens"].get(lang, 0) + tok_count
84
-
85
- with lock:
86
- state["processed_files"].append(fname)
87
- save_state()
88
-
89
- print(f" βœ“ {path.name} | langs: {list(df['language'].unique())}")
90
-
91
- except Exception as e:
92
- print(f" βœ— {path.name} ERROR: {e}")
93
-
94
 
95
- with ThreadPoolExecutor(max_workers=8) as ex:
96
- list(ex.map(process_file, parquet_files))
97
-
98
- # ── Save per-language meta ──
99
- print("\nSaving per-language meta.json...")
100
- for lang, total_tokens in state["lang_tokens"].items():
101
- lang_dir = os.path.join(OUT_DIR, lang)
102
- os.makedirs(lang_dir, exist_ok=True)
103
- with open(os.path.join(lang_dir, "meta.json"), "w") as f:
104
- json.dump({"language": lang, "total_tokens": total_tokens}, f, indent=2)
105
- print(f" {lang}: {total_tokens:,}")
106
-
107
- # ── Push to HF ──
108
- print("\nPushing to HuggingFace...")
109
- api = HfApi()
110
- api.upload_folder(
111
- folder_path=OUT_DIR,
112
- repo_id=f"{HF_USERNAME}/{DATASET_NAME}",
113
- repo_type="dataset",
114
- )
115
-
116
- print("\nDone!")
117
- for l, t in sorted(state["lang_tokens"].items(), key=lambda x: -x[1]):
118
- print(f" {l}: {t:,}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import os
2
  import json
3
+ import time
4
  import threading
5
+ import io
6
  import requests
7
  import pandas as pd
8
  from pathlib import Path
9
+ from tokenizers import Tokenizer
10
+ from huggingface_hub import HfApi
11
+
12
+ # ── Config ───────────────────────────────────────────────────────────────────
13
+ HF_TOKEN = os.environ.get("HF_TOKEN")
14
+ HF_USERNAME = "Neon-coding"
15
+ DATASET_REPO = f"{HF_USERNAME}/github-code-raw"
16
+ BUCKET_REPO = f"{HF_USERNAME}/ureola-bucket" # where tokenizer.json lives
17
+ OUT_DIR = "/data/by-language"
18
+ STATE_FILE = "/data/progress_state.json"
19
+ TOK_FILENAME = "tokenizer.json"
20
+ TOTAL_PARQUETS = 880
21
+ SHARD_TOKENS = 100_000 # exactly 100k tokens per shard file
22
+
23
+ PARQUET_URL = (
24
+ "https://huggingface.co/datasets/codeparrot/github-code-clean"
25
+ "/resolve/main/data/train-{i:05d}-of-00880.parquet"
26
+ )
27
 
28
  os.makedirs(OUT_DIR, exist_ok=True)
29
+ os.makedirs("/data", exist_ok=True)
30
+
31
+ api = HfApi(token=HF_TOKEN)
32
+
33
+ # ── Pull tokenizer.json from bucket ─────────────────────────────────────────
34
+ def load_tokenizer():
35
+ tok_path = f"/data/{TOK_FILENAME}"
36
+ if not os.path.exists(tok_path):
37
+ print("Pulling tokenizer.json from bucket...")
38
+ api.hf_hub_download(
39
+ repo_id=BUCKET_REPO,
40
+ repo_type="dataset",
41
+ filename=TOK_FILENAME,
42
+ local_dir="/data",
43
+ token=HF_TOKEN,
44
+ )
45
+ tokenizer = Tokenizer.from_file(tok_path)
46
+ print(f"βœ“ Tokenizer loaded | vocab: {tokenizer.get_vocab_size():,}")
47
+ return tokenizer
48
+
49
+ # ── State ────────────────────────────────────────────────────────────────────
50
+ def load_state():
51
+ if os.path.exists(STATE_FILE):
52
+ with open(STATE_FILE) as f:
53
+ state = json.load(f)
54
+ print(f"Resuming β€” {len(state['done'])} parquets done")
55
+ else:
56
+ state = {
57
+ "done": [], # list of parquet indices completed
58
+ "lang_shards": {}, # {lang: current shard index}
59
+ "lang_tokens": {}, # {lang: total tokens written so far}
60
+ }
61
+ print("Starting fresh")
62
+ return state
63
+
64
+ def save_state(state):
65
  with open(STATE_FILE, "w") as f:
66
  json.dump(state, f, indent=2)
67
 
68
+ # ── Shard buffer: one per language, persists across parquets ─────────────────
69
+ # buffers[lang] = {"rows": [...], "token_count": N}
70
+ buffers = {}
71
+
72
+ def get_buffer(lang):
73
+ if lang not in buffers:
74
+ buffers[lang] = {"rows": [], "token_count": 0}
75
+ return buffers[lang]
76
+
77
+ def flush_shard(lang, rows, state):
78
+ """Write rows to a new shard file and upload to HF dataset repo."""
79
+ shard_idx = state["lang_shards"].get(lang, 0)
80
+ lang_dir = Path(OUT_DIR) / lang
81
+ lang_dir.mkdir(parents=True, exist_ok=True)
82
+
83
+ shard_name = f"shard_{shard_idx:05d}.jsonl"
84
+ shard_path = lang_dir / shard_name
85
+
86
+ with open(shard_path, "w") as f:
87
+ for row in rows:
88
+ f.write(json.dumps(row, ensure_ascii=False) + "\n")
89
+
90
+ # upload to HF
91
+ api.upload_file(
92
+ path_or_fileobj=str(shard_path),
93
+ path_in_repo=f"{lang}/{shard_name}",
94
+ repo_id=DATASET_REPO,
95
+ repo_type="dataset",
96
+ token=HF_TOKEN,
97
+ )
98
+ print(f" βœ“ Uploaded {lang}/{shard_name} | {len(rows)} samples")
99
+
100
+ # update state
101
+ state["lang_shards"][lang] = shard_idx + 1
102
+ state["lang_tokens"][lang] = state["lang_tokens"].get(lang, 0) + sum(
103
+ r["token_count"] for r in rows
104
+ )
105
+
106
+ # ── Core processing loop ─────────────────────────────────────────────────────
107
+ def process(tokenizer, state):
108
+ for i in range(TOTAL_PARQUETS):
109
+ if i in state["done"]:
110
+ print(f"[{i:05d}] SKIP")
111
+ continue
112
+
113
+ url = PARQUET_URL.format(i=i)
114
+ print(f"[{i:05d}] Downloading...")
115
+
116
+ try:
117
+ resp = requests.get(
118
+ url,
119
+ headers={"Authorization": f"Bearer {HF_TOKEN}"},
120
+ timeout=120,
121
  )
122
+ resp.raise_for_status()
123
+ df = pd.read_parquet(io.BytesIO(resp.content))
124
+ except Exception as e:
125
+ print(f"[{i:05d}] Download error: {e} β€” skipping")
126
+ continue
127
 
128
+ print(f"[{i:05d}] {len(df):,} rows | processing...")
 
 
 
 
 
 
 
 
 
 
 
129
 
130
+ for lang, group in df.groupby("language"):
131
+ buf = get_buffer(lang)
132
+
133
+ texts = group["code"].fillna("").tolist()
134
+ repos = group["repo_name"].tolist()
135
+ paths = group["path"].tolist()
136
+ licenses = group["license"].tolist()
137
+
138
+ encoded = tokenizer.encode_batch(texts)
139
+
140
+ for idx, enc in enumerate(encoded):
141
+ token_count = len(enc.ids)
142
+
143
+ # skip junk (empty or single token)
144
+ if token_count < 2:
145
+ continue
146
+
147
+ row = {
148
+ "text": texts[idx],
149
+ "token_count": token_count,
150
+ "repo": repos[idx],
151
+ "path": paths[idx],
152
+ "license": licenses[idx],
153
+ }
154
+
155
+ # if this single sample alone exceeds shard size, still include it
156
+ # β€” don't lose real data, just let that shard be a bit over
157
+ if buf["token_count"] + token_count > SHARD_TOKENS and buf["rows"]:
158
+ # flush current buffer first
159
+ flush_shard(lang, buf["rows"], state)
160
+ save_state(state)
161
+ buf["rows"] = []
162
+ buf["token_count"] = 0
163
+
164
+ buf["rows"].append(row)
165
+ buf["token_count"] += token_count
166
+
167
+ state["done"].append(i)
168
+ save_state(state)
169
+ print(f"[{i:05d}] βœ“ Done")
170
+
171
+ # ── Flush any remaining partial shards ───────────────────────────────────
172
+ print("\nFlushing remaining buffers...")
173
+ for lang, buf in buffers.items():
174
+ if buf["rows"]:
175
+ flush_shard(lang, buf["rows"], state)
176
+ save_state(state)
177
+
178
+ # ── Write per-language meta ───────────────────────────────────────────────
179
+ print("\nWriting meta.json per language...")
180
+ for lang, total_tokens in state["lang_tokens"].items():
181
+ meta = {
182
+ "language": lang,
183
+ "total_tokens": total_tokens,
184
+ "total_shards": state["lang_shards"].get(lang, 0),
185
+ }
186
+ meta_path = Path(OUT_DIR) / lang / "meta.json"
187
+ with open(meta_path, "w") as f:
188
+ json.dump(meta, f, indent=2)
189
+ api.upload_file(
190
+ path_or_fileobj=str(meta_path),
191
+ path_in_repo=f"{lang}/meta.json",
192
+ repo_id=DATASET_REPO,
193
+ repo_type="dataset",
194
+ token=HF_TOKEN,
195
+ )
196
+ print(f" {lang}: {total_tokens:,} tokens | {meta['total_shards']} shards")
197
+
198
+ print("\nβœ“ All done!")
199
+
200
+ # ── Entry point ──────────────────────────────────────────────────────────────
201
+ def main():
202
+ tokenizer = load_tokenizer()
203
+ state = load_state()
204
+
205
+ # fire processing in background so Space stays alive
206
+ t = threading.Thread(target=process, args=(tokenizer, state), daemon=True)
207
+ t.start()
208
+
209
+ # keep the Space running
210
+ while True:
211
+ time.sleep(60)
212
+
213
+ if __name__ == "__main__":
214
+ main()