Imaginethat commited on
Commit
62c6ba2
·
verified ·
1 Parent(s): 24f62d8

Update run_job.py

Browse files
Files changed (1) hide show
  1. run_job.py +204 -188
run_job.py CHANGED
@@ -1,188 +1,204 @@
1
- import gradio as gr
2
- import os
3
- import threading
4
- import subprocess
5
- import time
6
- import shutil
7
- from huggingface_hub import HfApi, hf_hub_download, create_repo
8
-
9
- # --- CONFIG ---
10
- INPUT_DATASET = "The-data-company/TikTok-10M"
11
- # CHANGE THIS TO YOUR USERNAME!
12
- OUTPUT_REPO = "Imaginethat/tiktok-sys7-mined-results"
13
- HF_TOKEN = os.environ.get("HF_TOKEN")
14
-
15
- # Global variables for the UI
16
- log_buffer = "Initializing Environment...\n"
17
- dataset_link_md = ""
18
- mining_active = False
19
-
20
- def log(msg):
21
- """Adds a message to the UI log window"""
22
- global log_buffer
23
- timestamp = time.strftime("%H:%M:%S")
24
- entry = f"[{timestamp}] {msg}"
25
- print(entry) # Keep console logs as backup
26
- log_buffer += entry + "\n"
27
-
28
- def run_mining_logic():
29
- global dataset_link_md, mining_active
30
-
31
- if not HF_TOKEN:
32
- log("ERROR: HF_TOKEN secret is missing in Settings!")
33
- return
34
-
35
- api = HfApi(token=HF_TOKEN)
36
-
37
- # 1. Setup Repo
38
- log(f"Setting up Output Repo: {OUTPUT_REPO}")
39
- try:
40
- create_repo(repo_id=OUTPUT_REPO, repo_type="dataset", exist_ok=True)
41
- except Exception as e:
42
- log(f"Repo setup note: {e}")
43
-
44
- # 2. List Files
45
- log(f"Scanning {INPUT_DATASET} for files...")
46
- try:
47
- all_files = api.list_repo_files(repo_id=INPUT_DATASET, repo_type="dataset")
48
- parquet_files = [f for f in all_files if f.endswith(".parquet") and "train" in f]
49
- log(f"Found {len(parquet_files)} shards to process.")
50
- except Exception as e:
51
- log(f"Failed to list files: {e}")
52
- return
53
-
54
- # 3. Process Shards
55
- input_dir = "./data_input"
56
- output_dir = "./data_output"
57
-
58
- for i, file_path in enumerate(parquet_files):
59
- log(f"--- Processing Shard {i+1}/{len(parquet_files)}: {file_path} ---")
60
-
61
- # Cleanup
62
- if os.path.exists(input_dir): shutil.rmtree(input_dir)
63
- if os.path.exists(output_dir): shutil.rmtree(output_dir)
64
- os.makedirs(input_dir, exist_ok=True)
65
- os.makedirs(output_dir, exist_ok=True)
66
-
67
- # Download
68
- try:
69
- local_path = hf_hub_download(
70
- repo_id=INPUT_DATASET,
71
- filename=file_path,
72
- repo_type="dataset",
73
- local_dir=input_dir,
74
- token=HF_TOKEN
75
- )
76
- except Exception as e:
77
- log(f"Download failed: {e}")
78
- continue
79
-
80
- # Run Miner (Streaming output to UI)
81
- shard_output_name = f"sys7_features_part_{i:04d}.parquet"
82
- shard_output_path = os.path.join(output_dir, shard_output_name)
83
-
84
- cmd = [
85
- "python", "-u", "sys7_miner_2.py", # -u forces unbuffered output
86
- "--input-path", input_dir,
87
- "--output-parquet", shard_output_path,
88
- "--lexicons", "system7_lexicons.json",
89
- "--label-orders", "label_orders.json",
90
- "--slang-lexicon", "slang_lexicon.json",
91
- "--phrase-lexicon", "sys7_phrase_lexicons_desc_only.json",
92
- "--auto-language-detect",
93
- "--language-allowlist", "en",
94
- "--min-chars", "10",
95
- "--min-tokens", "2",
96
- "--device", "cuda",
97
- "--batch-size", "50000",
98
- "--embedding-batch-size", "512",
99
- "--workers", "4"
100
- ]
101
-
102
- # Use Popen to capture logs in real-time
103
- try:
104
- process = subprocess.Popen(
105
- cmd,
106
- stdout=subprocess.PIPE,
107
- stderr=subprocess.STDOUT,
108
- text=True,
109
- bufsize=1
110
- )
111
-
112
- # Read stdout line by line
113
- for line in process.stdout:
114
- if "Processed" in line or "Error" in line: # Filter logs to keep UI clean?
115
- log("MINER: " + line.strip())
116
- elif "Loading" in line:
117
- log("MINER: " + line.strip())
118
-
119
- process.wait()
120
-
121
- if process.returncode != 0:
122
- log(f"Miner failed with code {process.returncode}")
123
- continue
124
-
125
- except Exception as e:
126
- log(f"Subprocess error: {e}")
127
- continue
128
-
129
- # Upload
130
- try:
131
- log(f"Uploading {shard_output_name}...")
132
- api.upload_file(
133
- path_or_fileobj=shard_output_path,
134
- path_in_repo=f"data/{shard_output_name}",
135
- repo_id=OUTPUT_REPO,
136
- repo_type="dataset"
137
- )
138
- log("Upload Successful!")
139
- except Exception as e:
140
- log(f"Upload failed: {e}")
141
-
142
- # 4. Finish
143
- final_url = f"https://huggingface.co/datasets/{OUTPUT_REPO}"
144
- log("ALL JOBS COMPLETE.")
145
- dataset_link_md = f"## 🎉 DONE! Data is ready: [Click Here to View Dataset]({final_url})"
146
- mining_active = False
147
-
148
- def start_thread():
149
- global mining_active
150
- if mining_active:
151
- return "Already Running..."
152
- mining_active = True
153
- t = threading.Thread(target=run_mining_logic)
154
- t.start()
155
- return "Mining Started! Watch logs below."
156
-
157
- # --- GRADIO UI ---
158
- with gr.Blocks(title="TikTok Miner") as demo:
159
- gr.Markdown("# ⛏️ System 7 Miner Control")
160
-
161
- with gr.Row():
162
- start_btn = gr.Button("🚀 Start Mining Job", variant="primary")
163
- status_txt = gr.Textbox(label="Status", value="Ready to start.", interactive=False)
164
-
165
- # The Log Window
166
- logs_out = gr.TextArea(
167
- label="Live Process Logs",
168
- placeholder="Logs will stream here...",
169
- lines=20,
170
- max_lines=25,
171
- autoscroll=True
172
- )
173
-
174
- # The Success Link (Hidden until done)
175
- link_display = gr.Markdown("")
176
-
177
- # Button Action
178
- start_btn.click(fn=start_thread, inputs=None, outputs=status_txt)
179
-
180
- # Auto-refresh the logs every 1 second
181
- def update_ui():
182
- return log_buffer, dataset_link_md
183
-
184
- timer = gr.Timer(1)
185
- timer.tick(fn=update_ui, outputs=[logs_out, link_display])
186
-
187
- # Launch on the port HF expects
188
- demo.launch(server_name="0.0.0.0", server_port=7860)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import gradio as gr
2
+ import os
3
+ import threading
4
+ import subprocess
5
+ import time
6
+ import shutil
7
+ from huggingface_hub import HfApi, hf_hub_download, create_repo
8
+
9
+ # --- CONFIG ---
10
+ INPUT_DATASET = "The-data-company/TikTok-10M"
11
+ # CHANGE THIS TO YOUR USERNAME!
12
+ OUTPUT_REPO = "Imaginethat/tiktok-sys7-mined-results"
13
+ HF_TOKEN = os.environ.get("HF_TOKEN")
14
+
15
+ # Global variables for the UI
16
+ log_buffer = "Initializing Environment...\n"
17
+ dataset_link_md = ""
18
+ mining_active = False
19
+
20
+ def log(msg):
21
+ """Adds a message to the UI log window"""
22
+ global log_buffer
23
+ timestamp = time.strftime("%H:%M:%S")
24
+ entry = f"[{timestamp}] {msg}"
25
+ print(entry) # Keep console logs as backup
26
+ log_buffer += entry + "\n"
27
+
28
+ def run_mining_logic():
29
+ global dataset_link_md, mining_active
30
+
31
+ if not HF_TOKEN:
32
+ log("ERROR: HF_TOKEN secret is missing in Settings!")
33
+ return
34
+
35
+ api = HfApi(token=HF_TOKEN)
36
+
37
+ # 1. Setup Repo
38
+ log(f"Setting up Output Repo: {OUTPUT_REPO}")
39
+ try:
40
+ create_repo(repo_id=OUTPUT_REPO, repo_type="dataset", exist_ok=True)
41
+ except Exception as e:
42
+ log(f"Repo setup note: {e}")
43
+
44
+ # 2. List Files
45
+ log(f"Scanning {INPUT_DATASET} for files...")
46
+ try:
47
+ all_files = api.list_repo_files(repo_id=INPUT_DATASET, repo_type="dataset")
48
+ parquet_files = [f for f in all_files if f.endswith(".parquet") and "train" in f]
49
+ parquet_files = sorted(parquet_files)
50
+ log(f"Found {len(parquet_files)} shards to process.")
51
+ except Exception as e:
52
+ log(f"Failed to list files: {e}")
53
+ return
54
+
55
+ # 3. Process Shards
56
+ input_dir = "./data_input"
57
+ output_dir = "./data_output"
58
+
59
+ # Resume support: skip shards already uploaded to OUTPUT_REPO.
60
+ existing_outputs = set()
61
+ try:
62
+ existing_outputs = set(api.list_repo_files(repo_id=OUTPUT_REPO, repo_type="dataset"))
63
+ done = [p for p in existing_outputs if p.startswith("data/sys7_features_part_") and p.endswith(".parquet")]
64
+ log(f"Resume: found {len(done)} existing output shards in {OUTPUT_REPO}.")
65
+ except Exception as e:
66
+ log(f"Resume warning: could not list output repo files: {e}")
67
+
68
+ for i, file_path in enumerate(parquet_files):
69
+ shard_output_name = f"sys7_features_part_{i:04d}.parquet"
70
+ remote_out_path = f"data/{shard_output_name}"
71
+ if remote_out_path in existing_outputs:
72
+ log(f"Skipping Shard {i+1}/{len(parquet_files)} (already uploaded): {remote_out_path}")
73
+ continue
74
+
75
+ log(f"--- Processing Shard {i+1}/{len(parquet_files)}: {file_path} ---")
76
+
77
+ # Cleanup
78
+ if os.path.exists(input_dir): shutil.rmtree(input_dir)
79
+ if os.path.exists(output_dir): shutil.rmtree(output_dir)
80
+ os.makedirs(input_dir, exist_ok=True)
81
+ os.makedirs(output_dir, exist_ok=True)
82
+
83
+ # Download
84
+ try:
85
+ local_path = hf_hub_download(
86
+ repo_id=INPUT_DATASET,
87
+ filename=file_path,
88
+ repo_type="dataset",
89
+ local_dir=input_dir,
90
+ token=HF_TOKEN
91
+ )
92
+ except Exception as e:
93
+ log(f"Download failed: {e}")
94
+ continue
95
+
96
+ # Run Miner (Streaming output to UI)
97
+ shard_output_path = os.path.join(output_dir, shard_output_name)
98
+
99
+ cmd = [
100
+ "python", "-u", "sys7_miner_2.py", # -u forces unbuffered output
101
+ "--input-path", input_dir,
102
+ "--output-parquet", shard_output_path,
103
+ "--lexicons", "system7_lexicons.json",
104
+ "--label-orders", "label_orders.json",
105
+ "--slang-lexicon", "slang_lexicon.json",
106
+ "--phrase-lexicon", "sys7_phrase_lexicons_desc_only.json",
107
+ "--auto-language-detect",
108
+ "--language-allowlist", "en",
109
+ "--min-chars", "10",
110
+ "--min-tokens", "2",
111
+ "--device", "cuda",
112
+ "--batch-size", "50000",
113
+ "--embedding-batch-size", "512",
114
+ "--workers", "4"
115
+ ]
116
+
117
+ # Use Popen to capture logs in real-time
118
+ try:
119
+ process = subprocess.Popen(
120
+ cmd,
121
+ stdout=subprocess.PIPE,
122
+ stderr=subprocess.STDOUT,
123
+ text=True,
124
+ bufsize=1
125
+ )
126
+
127
+ # Read stdout line by line
128
+ for line in process.stdout:
129
+ if "Processed" in line or "Error" in line: # Filter logs to keep UI clean?
130
+ log("MINER: " + line.strip())
131
+ elif "Loading" in line:
132
+ log("MINER: " + line.strip())
133
+
134
+ process.wait()
135
+
136
+ if process.returncode != 0:
137
+ log(f"Miner failed with code {process.returncode}")
138
+ continue
139
+
140
+ except Exception as e:
141
+ log(f"Subprocess error: {e}")
142
+ continue
143
+
144
+ # Upload
145
+ try:
146
+ log(f"Uploading {shard_output_name}...")
147
+ api.upload_file(
148
+ path_or_fileobj=shard_output_path,
149
+ path_in_repo=f"data/{shard_output_name}",
150
+ repo_id=OUTPUT_REPO,
151
+ repo_type="dataset"
152
+ )
153
+ existing_outputs.add(remote_out_path)
154
+ log("Upload Successful!")
155
+ except Exception as e:
156
+ log(f"Upload failed: {e}")
157
+
158
+ # 4. Finish
159
+ final_url = f"https://huggingface.co/datasets/{OUTPUT_REPO}"
160
+ log("ALL JOBS COMPLETE.")
161
+ dataset_link_md = f"## 🎉 DONE! Data is ready: [Click Here to View Dataset]({final_url})"
162
+ mining_active = False
163
+
164
+ def start_thread():
165
+ global mining_active
166
+ if mining_active:
167
+ return "Already Running..."
168
+ mining_active = True
169
+ t = threading.Thread(target=run_mining_logic)
170
+ t.start()
171
+ return "Mining Started! Watch logs below."
172
+
173
+ # --- GRADIO UI ---
174
+ with gr.Blocks(title="TikTok Miner") as demo:
175
+ gr.Markdown("# ⛏️ System 7 Miner Control")
176
+
177
+ with gr.Row():
178
+ start_btn = gr.Button("🚀 Start Mining Job", variant="primary")
179
+ status_txt = gr.Textbox(label="Status", value="Ready to start.", interactive=False)
180
+
181
+ # The Log Window
182
+ logs_out = gr.TextArea(
183
+ label="Live Process Logs",
184
+ placeholder="Logs will stream here...",
185
+ lines=20,
186
+ max_lines=25,
187
+ autoscroll=True
188
+ )
189
+
190
+ # The Success Link (Hidden until done)
191
+ link_display = gr.Markdown("")
192
+
193
+ # Button Action
194
+ start_btn.click(fn=start_thread, inputs=None, outputs=status_txt)
195
+
196
+ # Auto-refresh the logs every 1 second
197
+ def update_ui():
198
+ return log_buffer, dataset_link_md
199
+
200
+ timer = gr.Timer(1)
201
+ timer.tick(fn=update_ui, outputs=[logs_out, link_display])
202
+
203
+ # Launch on the port HF expects
204
+ demo.launch(server_name="0.0.0.0", server_port=7860)