Imaginethat commited on
Commit
be21dd9
·
verified ·
1 Parent(s): a9e19bf

Upload run_job.py

Browse files
Files changed (1) hide show
  1. run_job.py +183 -0
run_job.py ADDED
@@ -0,0 +1,183 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import gradio as gr
2
+ import os
3
+ import threading
4
+ import subprocess
5
+ import time
6
+ import shutil
7
+ from huggingface_hub import HfApi, hf_hub_download, create_repo
8
+
9
+ # --- CONFIG ---
10
+ INPUT_DATASET = "The-data-company/TikTok-10M"
11
+ # CHANGE THIS TO YOUR USERNAME!
12
+ OUTPUT_REPO = "Imaginethat/tiktok-sys7-mined-results"
13
+ HF_TOKEN = os.environ.get("HF_TOKEN")
14
+
15
+ # Global variables for the UI
16
+ log_buffer = "Initializing Environment...\n"
17
+ dataset_link_md = ""
18
+ mining_active = False
19
+
20
+ def log(msg):
21
+ """Adds a message to the UI log window"""
22
+ global log_buffer
23
+ timestamp = time.strftime("%H:%M:%S")
24
+ entry = f"[{timestamp}] {msg}"
25
+ print(entry) # Keep console logs as backup
26
+ log_buffer += entry + "\n"
27
+
28
+ def run_mining_logic():
29
+ global dataset_link_md, mining_active
30
+
31
+ if not HF_TOKEN:
32
+ log("ERROR: HF_TOKEN secret is missing in Settings!")
33
+ return
34
+
35
+ api = HfApi(token=HF_TOKEN)
36
+
37
+ # 1. Setup Repo
38
+ log(f"Setting up Output Repo: {OUTPUT_REPO}")
39
+ try:
40
+ create_repo(repo_id=OUTPUT_REPO, repo_type="dataset", exist_ok=True)
41
+ except Exception as e:
42
+ log(f"Repo setup note: {e}")
43
+
44
+ # 2. List Files
45
+ log(f"Scanning {INPUT_DATASET} for files...")
46
+ try:
47
+ all_files = api.list_repo_files(repo_id=INPUT_DATASET, repo_type="dataset")
48
+ parquet_files = [f for f in all_files if f.endswith(".parquet") and "train" in f]
49
+ log(f"Found {len(parquet_files)} shards to process.")
50
+ except Exception as e:
51
+ log(f"Failed to list files: {e}")
52
+ return
53
+
54
+ # 3. Process Shards
55
+ input_dir = "./data_input"
56
+ output_dir = "./data_output"
57
+
58
+ for i, file_path in enumerate(parquet_files):
59
+ log(f"--- Processing Shard {i+1}/{len(parquet_files)}: {file_path} ---")
60
+
61
+ # Cleanup
62
+ if os.path.exists(input_dir): shutil.rmtree(input_dir)
63
+ if os.path.exists(output_dir): shutil.rmtree(output_dir)
64
+ os.makedirs(input_dir, exist_ok=True)
65
+ os.makedirs(output_dir, exist_ok=True)
66
+
67
+ # Download
68
+ try:
69
+ local_path = hf_hub_download(
70
+ repo_id=INPUT_DATASET,
71
+ filename=file_path,
72
+ repo_type="dataset",
73
+ local_dir=input_dir,
74
+ token=HF_TOKEN
75
+ )
76
+ except Exception as e:
77
+ log(f"Download failed: {e}")
78
+ continue
79
+
80
+ # Run Miner (Streaming output to UI)
81
+ shard_output_name = f"sys7_features_part_{i:04d}.parquet"
82
+ shard_output_path = os.path.join(output_dir, shard_output_name)
83
+
84
+ cmd = [
85
+ "python", "-u", "sys7_miner.py", # -u forces unbuffered output
86
+ "--input-path", input_dir,
87
+ "--output-parquet", shard_output_path,
88
+ "--lexicons", "system7_lexicons.json",
89
+ "--label-orders", "label_orders.json",
90
+ "--slang-lexicon", "slang_lexicon.json",
91
+ "--device", "cuda",
92
+ "--batch-size", "50000",
93
+ "--embedding-batch-size", "512",
94
+ "--workers", "4"
95
+ ]
96
+
97
+ # Use Popen to capture logs in real-time
98
+ try:
99
+ process = subprocess.Popen(
100
+ cmd,
101
+ stdout=subprocess.PIPE,
102
+ stderr=subprocess.STDOUT,
103
+ text=True,
104
+ bufsize=1
105
+ )
106
+
107
+ # Read stdout line by line
108
+ for line in process.stdout:
109
+ if "Processed" in line or "Error" in line: # Filter logs to keep UI clean?
110
+ log("MINER: " + line.strip())
111
+ elif "Loading" in line:
112
+ log("MINER: " + line.strip())
113
+
114
+ process.wait()
115
+
116
+ if process.returncode != 0:
117
+ log(f"Miner failed with code {process.returncode}")
118
+ continue
119
+
120
+ except Exception as e:
121
+ log(f"Subprocess error: {e}")
122
+ continue
123
+
124
+ # Upload
125
+ try:
126
+ log(f"Uploading {shard_output_name}...")
127
+ api.upload_file(
128
+ path_or_fileobj=shard_output_path,
129
+ path_in_repo=f"data/{shard_output_name}",
130
+ repo_id=OUTPUT_REPO,
131
+ repo_type="dataset"
132
+ )
133
+ log("Upload Successful!")
134
+ except Exception as e:
135
+ log(f"Upload failed: {e}")
136
+
137
+ # 4. Finish
138
+ final_url = f"https://huggingface.co/datasets/{OUTPUT_REPO}"
139
+ log("ALL JOBS COMPLETE.")
140
+ dataset_link_md = f"## 🎉 DONE! Data is ready: [Click Here to View Dataset]({final_url})"
141
+ mining_active = False
142
+
143
+ def start_thread():
144
+ global mining_active
145
+ if mining_active:
146
+ return "Already Running..."
147
+ mining_active = True
148
+ t = threading.Thread(target=run_mining_logic)
149
+ t.start()
150
+ return "Mining Started! Watch logs below."
151
+
152
+ # --- GRADIO UI ---
153
+ with gr.Blocks(title="TikTok Miner") as demo:
154
+ gr.Markdown("# ⛏️ System 7 Miner Control")
155
+
156
+ with gr.Row():
157
+ start_btn = gr.Button("🚀 Start Mining Job", variant="primary")
158
+ status_txt = gr.Textbox(label="Status", value="Ready to start.", interactive=False)
159
+
160
+ # The Log Window
161
+ logs_out = gr.TextArea(
162
+ label="Live Process Logs",
163
+ placeholder="Logs will stream here...",
164
+ lines=20,
165
+ max_lines=25,
166
+ autoscroll=True
167
+ )
168
+
169
+ # The Success Link (Hidden until done)
170
+ link_display = gr.Markdown("")
171
+
172
+ # Button Action
173
+ start_btn.click(fn=start_thread, inputs=None, outputs=status_txt)
174
+
175
+ # Auto-refresh the logs every 1 second
176
+ def update_ui():
177
+ return log_buffer, dataset_link_md
178
+
179
+ timer = gr.Timer(1)
180
+ timer.tick(fn=update_ui, outputs=[logs_out, link_display])
181
+
182
+ # Launch on the port HF expects
183
+ demo.launch(server_name="0.0.0.0", server_port=7860)