Minerv4 / sys7_miner_2.py
Imaginethat's picture
Upload 4 files
0e20813 verified
"""
System 7 Miner
---------------
Module 1: Clean TikTok rows, apply lexicon-derived raw signals, and emit feature vectors.
Core outputs:
- tiktok10m_sys7_features.parquet
- optional sys7_miner_metadata.json
This script is designed to stream large CSVs in chunks (no 10M-row load),
normalize text with ftfy, compute raw lexical scores from System 6 lexicons,
and generate contextual embeddings via sentence-transformers.
"""
from __future__ import annotations
import argparse
import json
import logging
import math
import os
import re
import time
from collections import defaultdict
from concurrent.futures import ProcessPoolExecutor
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Dict, List, Optional, Sequence, Tuple
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
try:
from sentence_transformers import SentenceTransformer
except ImportError: # pragma: no cover - handled at runtime
SentenceTransformer = None # type: ignore
try:
import ftfy
except ImportError: # pragma: no cover - handled at runtime
ftfy = None # type: ignore
try:
from langdetect import detect as langdetect_detect
from langdetect.detector_factory import DetectorFactory
except ImportError: # pragma: no cover - optional dependency
langdetect_detect = None # type: ignore
else: # pragma: no cover - runtime behavior
# Make langdetect deterministic across runs/processes.
DetectorFactory.seed = 0
logger = logging.getLogger("sys7_miner")
CONTROL_CHARS_RE = re.compile(r"[\u0000-\u0008\u000b\u000c\u000e-\u001f]")
REPLACEMENT_RUN_RE = re.compile("�{2,}")
WHITESPACE_RE = re.compile(r"\s+")
HASHTAG_SPLIT_RE = re.compile(r"[A-Z]?[a-z]+|[0-9]+")
TOKENIZER_RE = re.compile(r"[a-z0-9']+")
TIME_ALNUM_RE = re.compile(r"[^a-z0-9]+")
TIME_HOLIDAY_MONTH_MAP: Dict[str, int] = {
"newyear": 1,
"newyears": 1,
"nye": 1,
"valentine": 2,
"valentines": 2,
"stpatrick": 3,
"easter": 4,
"mothersday": 5,
"memorial": 5,
"juneteenth": 6,
"pride": 6,
"father": 6,
"independence": 7,
"july4": 7,
"labor": 9,
"halloween": 10,
"thanksgiving": 11,
"blackfriday": 11,
"cybermonday": 11,
"christmas": 12,
"xmas": 12,
"hanukkah": 12,
}
TIME_MONTH_KEYWORDS: Dict[str, int] = {
"january": 1,
"jan": 1,
"february": 2,
"feb": 2,
"march": 3,
"mar": 3,
"april": 4,
"apr": 4,
"may": 5,
"june": 6,
"jun": 6,
"july": 7,
"jul": 7,
"august": 8,
"aug": 8,
"september": 9,
"sept": 9,
"sep": 9,
"october": 10,
"oct": 10,
"november": 11,
"nov": 11,
"december": 12,
"dec": 12,
}
TIME_SEASON_TERMS = {
"spring",
"summer",
"fall",
"autumn",
"winter",
"backtoschool",
"graduation",
}
TIME_VIRAL_TOKENS = {
"fyp",
"foryou",
"foryoupage",
"viral",
"trending",
"trend",
"xyzbca",
}
def _time_normalize_token(token: str) -> str:
return TIME_ALNUM_RE.sub("", (token or "").lower())
def _time_parse_created_month(created_date: Optional[str]) -> Optional[int]:
if not created_date:
return None
try:
parts = str(created_date).split("T", 1)[0].split(" ", 1)[0].split("-", 2)
if len(parts) >= 2:
m = int(parts[1])
if 1 <= m <= 12:
return m
except Exception:
return None
return None
def _time_detect_month_from_token(token: str) -> Optional[int]:
token = _time_normalize_token(token)
if not token:
return None
if token in TIME_MONTH_KEYWORDS:
return TIME_MONTH_KEYWORDS[token]
for holiday, month in TIME_HOLIDAY_MONTH_MAP.items():
if holiday in token:
return month
return None
def _time_squash_hits(hits: int, *, base: float = 0.35, step: float = 0.15) -> float:
if hits <= 0:
return 0.0
return float(min(1.0, base + step * (hits - 1)))
def compute_time_scores_derived(tokens: Sequence[str], created_date: Optional[str], label_order: Sequence[str]) -> List[float]:
seasonal_hits = 0
viral_hits = 0
token_month_hits: Dict[int, int] = {}
for tok in tokens or []:
norm = _time_normalize_token(str(tok))
if not norm:
continue
if norm in TIME_VIRAL_TOKENS:
viral_hits += 1
if norm in TIME_SEASON_TERMS:
seasonal_hits += 1
m = _time_detect_month_from_token(norm)
if m is not None:
seasonal_hits += 1
token_month_hits[m] = token_month_hits.get(m, 0) + 1
seasonal = _time_squash_hits(seasonal_hits)
viral = _time_squash_hits(viral_hits)
created_month = _time_parse_created_month(created_date)
if created_month and token_month_hits:
dominant_month = max(token_month_hits.items(), key=lambda kv: kv[1])[0]
if dominant_month == created_month and seasonal > 0:
seasonal = float(min(1.0, seasonal + 0.05))
by_label = {"seasonal": seasonal, "viral": viral}
return [float(by_label.get(label, 0.0)) for label in label_order]
@dataclass
class MinerConfig:
input_path: str
output_parquet: str = "tiktok10m_sys7_features.parquet"
lexicons: str = "system7_lexicons.json"
phrase_lexicon: Optional[str] = None
phrase_weight_scale: float = 1.0
slang_lexicon: str = "slang_lexicon.json"
label_orders: str = "label_orders.json"
batch_size: int = 50_000
text_cap: int = 1024
embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2"
embedding_batch_size: int = 128
device: str = "cpu"
workers: int = 2
metadata_path: Optional[str] = "sys7_miner_metadata.json"
diagnostics_path: Optional[str] = "sys7_miner_diagnostics.json"
min_chars: int = 10
min_tokens: int = 2
language_allowlist: Optional[List[str]] = None # e.g., ["en"]
auto_language_detect: bool = False
emit_flat_scores: bool = False
def load_json(path: str) -> Dict:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def auto_detect_language(text: str) -> Optional[str]:
"""Best-effort language detection using langdetect, if available."""
if not langdetect_detect:
return None
if not text:
return None
# langdetect cost scales with text length; captions + hashtags can be long/noisy.
# Truncate to keep detection cheap and stable.
if len(text) > 400:
text = text[:400]
try:
return str(langdetect_detect(text))
except Exception:
return None
def detect_languages_parallel(texts: List[str], workers: int) -> List[Optional[str]]:
"""Detect languages for many texts, optionally using multiprocessing.
langdetect is CPU-bound; using a ProcessPool sidesteps the GIL.
"""
if not texts:
return []
workers = int(workers or 0)
if workers <= 1 or not langdetect_detect:
return [auto_detect_language(t) for t in texts]
# Clamp to avoid spawning an excessive number of processes.
max_workers = min(workers, os.cpu_count() or workers)
chunksize = 256 # critical for many short strings
with ProcessPoolExecutor(max_workers=max_workers) as ex:
return list(ex.map(auto_detect_language, texts, chunksize=chunksize))
def is_nullish(val) -> bool:
if val is None:
return True
if isinstance(val, float) and math.isnan(val):
return True
if isinstance(val, str) and val.strip() == "":
return True
return False
def coerce_id(val) -> Optional[str]:
"""Coerce IDs to strings (safe for very large integers)."""
if is_nullish(val):
return None
if isinstance(val, (int, np.integer)):
return str(int(val))
if isinstance(val, (float, np.floating)):
if math.isnan(float(val)):
return None
if float(val).is_integer():
return str(int(val))
return str(val)
s = str(val).strip()
if s.endswith(".0"):
try:
as_int = int(float(s))
return str(as_int)
except Exception:
pass
return s
def first_present(row, *names: str):
for name in names:
if hasattr(row, name):
val = getattr(row, name, None)
if not is_nullish(val):
return val
return None
def extract_hashtag_tags(raw) -> List[str]:
"""
Extract a list of hashtag/challenge strings from a variety of formats:
- JSON string list: '["tag1","tag2"]'
- JSON string list of objects: '[{"title":"tag"}]'
- whitespace/comma separated string
- list/tuple of strings (or dicts)
"""
if is_nullish(raw):
return []
if isinstance(raw, (list, tuple)):
out: List[str] = []
for item in raw:
if is_nullish(item):
continue
if isinstance(item, str):
out.append(item.strip())
continue
if isinstance(item, dict):
for k in ("title", "name", "tag", "challenge", "challenge_name", "text"):
v = item.get(k)
if isinstance(v, str) and v.strip():
out.append(v.strip())
break
return [t for t in out if t]
if isinstance(raw, str):
s = raw.strip()
if not s:
return []
if s[:1] in ("[", "{"):
try:
obj = json.loads(s)
if isinstance(obj, list):
return extract_hashtag_tags(obj)
if isinstance(obj, dict):
for k in ("hashtags", "challenges", "tags"):
v = obj.get(k)
if v is not None:
return extract_hashtag_tags(v)
except Exception:
pass
# Fallback: find #tags, else split on commas/whitespace
tags = re.findall(r"#([A-Za-z0-9_]+)", s)
if tags:
return tags
parts = [p.strip().strip('"').strip("'").strip("[](){}") for p in re.split(r"[,\s]+", s) if p.strip()]
return [p.lstrip("#") for p in parts if p and p != "#"]
return []
def prepare_slang_map(raw) -> Dict[str, str]:
"""
Normalize slang lexicon to token -> canonical mapping.
Accepts dict (already mapping) or list of objects with 'token' and optional 'canonical' keys.
"""
mapping: Dict[str, str] = {}
if isinstance(raw, dict):
for k, v in raw.items():
mapping[str(k).lower()] = str(v) if v is not None else str(k).lower()
return mapping
if isinstance(raw, list):
for item in raw:
if not isinstance(item, dict):
continue
token = item.get("token") or item.get("slang") or item.get("term")
canonical = item.get("canonical") or item.get("value") or token
if token:
mapping[str(token).lower()] = str(canonical).lower()
return mapping
def orient_lexicons(raw_lexicons: Dict[str, Dict]) -> Dict[str, Dict[str, Dict[str, float]]]:
"""
Normalize lexicons into dim -> token -> {label: weight}.
Accepts lexicons shaped as label -> token -> weight or weight objects.
Ignores non-dict top-level entries (e.g., config/sources).
"""
def to_weight(val):
if isinstance(val, dict) and "weight" in val:
return val["weight"]
return val
oriented: Dict[str, Dict[str, Dict[str, float]]] = {}
for dim, labels in raw_lexicons.items():
if not isinstance(labels, dict):
continue # skip config/metadata blocks
token_map: Dict[str, Dict[str, float]] = defaultdict(dict)
for label, token_weights in labels.items():
if not isinstance(token_weights, dict):
continue
for token, weight in token_weights.items():
w = to_weight(weight)
try:
w_float = float(w)
except (TypeError, ValueError):
continue
token_map[token.lower()][label] = w_float
oriented[dim] = token_map
return oriented
def standardize_quotes(text: str) -> str:
return (
text.replace("“", '"')
.replace("”", '"')
.replace("‘", "'")
.replace("’", "'")
.replace("´", "'")
)
def split_hashtag(tag: str) -> List[str]:
parts = HASHTAG_SPLIT_RE.findall(tag)
if parts:
return [p.lower() for p in parts]
return [tag.lower()]
def normalize_hashtags(raw: Optional[str], slang_map: Dict[str, str]) -> List[str]:
if raw is None or (isinstance(raw, float) and math.isnan(raw)):
return []
tokens: List[str] = []
if isinstance(raw, str):
raw_parts = re.split(r"[,\s]+", raw)
elif isinstance(raw, (list, tuple)):
raw_parts = raw
else:
return tokens
for part in raw_parts:
if not part:
continue
cleaned = part.lstrip("#").strip()
if not cleaned:
continue
for token in split_hashtag(cleaned):
mapped = slang_map.get(token, token)
if mapped:
tokens.append(mapped)
return tokens
def fuse_text(description: Optional[str], hashtags: Optional[str | Sequence[str]], slang_map: Dict[str, str], cap: int) -> Tuple[str, List[str]]:
desc = "" if description is None or (isinstance(description, float) and math.isnan(description)) else str(description)
ht_tokens = normalize_hashtags(hashtags, slang_map)
hashtag_text = " ".join(ht_tokens)
fused = " ".join(filter(None, [desc, hashtag_text])).lower()
# if ftfy:
# fused = ftfy.fix_text(fused, normalization="NFC")
fused = CONTROL_CHARS_RE.sub(" ", fused)
fused = REPLACEMENT_RUN_RE.sub(" ", fused)
fused = standardize_quotes(fused)
fused = WHITESPACE_RE.sub(" ", fused).strip()
if cap and len(fused) > cap:
fused = fused[:cap]
return fused, ht_tokens
def tokenize(text: str, hashtag_tokens: Sequence[str]) -> List[str]:
base_tokens = TOKENIZER_RE.findall(text)
tokens = [t for t in base_tokens if t]
tokens.extend([t.lower() for t in hashtag_tokens if t])
return tokens
def compute_raw_scores(tokens: Sequence[str], lexicons: Dict[str, Dict[str, Dict[str, float]]], label_orders: Dict[str, List[str]]) -> Dict[str, List[float]]:
scores: Dict[str, Dict[str, float]] = {dim: defaultdict(float) for dim in label_orders}
for token in tokens:
for dim, token_map in lexicons.items():
if token not in token_map:
continue
for label, weight in token_map[token].items():
scores[dim][label] += weight
norm = max(1.0, math.log1p(len(tokens)))
vectorized: Dict[str, List[float]] = {}
for dim, order in label_orders.items():
vectorized[dim] = [scores[dim].get(label, 0.0) / norm for label in order]
return vectorized
def load_embedder(model_name: str, device: str) -> SentenceTransformer:
if SentenceTransformer is None:
raise ImportError("sentence-transformers is required. Install via `pip install sentence-transformers`.")
logger.info("Loading embedding model %s on device=%s", model_name, device)
return SentenceTransformer(model_name, device=device)
def embed_texts(model: SentenceTransformer, texts: List[str], batch_size: int) -> List[List[float]]:
embeddings = model.encode(
texts,
batch_size=batch_size,
show_progress_bar=False,
convert_to_numpy=True,
)
return embeddings.tolist()
def process_chunk(
df: pd.DataFrame,
lexicons: Dict[str, Dict[str, Dict[str, float]]],
phrase_vocab: Optional[set[str]],
phrase_ngram_sizes: Sequence[int],
slang_map: Dict[str, str],
label_orders: Dict[str, List[str]],
embedder: SentenceTransformer,
cfg: MinerConfig,
) -> Tuple[pa.Table, int, int]:
# First pass: cheap per-row work + min-length filter.
rows: List[object] = []
descriptions: List[Optional[str]] = []
hashtag_tags_list: List[List[str]] = []
fused_texts: List[str] = []
base_tokens_list: List[List[str]] = []
langs: List[Optional[str]] = []
needs_detect: List[bool] = []
filtered_lang = 0
filtered_short = 0
for row in df.itertuples(index=False):
description_val = first_present(row, "description", "desc")
hashtags_raw = first_present(row, "hashtags", "challenges")
hashtag_tags = extract_hashtag_tags(hashtags_raw)
description = str(description_val) if description_val is not None else None
fused_text, ht_tokens = fuse_text(description, hashtag_tags, slang_map, cfg.text_cap)
base_tokens = tokenize(fused_text, ht_tokens)
if len(fused_text) < cfg.min_chars or len(base_tokens) < cfg.min_tokens:
filtered_short += 1
continue
lang_val = getattr(row, "language", None) or getattr(row, "lang", None)
lang = None if is_nullish(lang_val) else str(lang_val).strip()
lang_norm = lang.lower() if lang else ""
need = bool(cfg.auto_language_detect and langdetect_detect and (not lang_norm or lang_norm in ("und", "unknown")))
rows.append(row)
descriptions.append(description)
hashtag_tags_list.append(hashtag_tags)
fused_texts.append(fused_text)
base_tokens_list.append(base_tokens)
langs.append(lang)
needs_detect.append(need)
# Multi-process language detection (CPU-bound).
if any(needs_detect):
detect_indices = [i for i, need in enumerate(needs_detect) if need]
detect_texts = [(descriptions[i] or "") for i in detect_indices]
t0 = time.perf_counter()
detected_langs = detect_languages_parallel(detect_texts, cfg.workers)
dt = time.perf_counter() - t0
used_workers = min(max(int(cfg.workers or 0), 1), os.cpu_count() or max(int(cfg.workers or 0), 1))
logger.info(
"Langdetect: detected %s rows in %.2fs using workers=%s",
len(detect_texts),
dt,
used_workers,
)
for idx, detected in zip(detect_indices, detected_langs):
if detected:
langs[idx] = detected
# Second pass: apply allowlist + phrase matching + scoring + record building.
records: List[Dict[str, object]] = []
texts: List[str] = []
for i, row in enumerate(rows):
lang = langs[i]
if cfg.language_allowlist:
if not lang:
filtered_lang += 1
continue
if str(lang).lower() not in cfg.language_allowlist:
filtered_lang += 1
continue
description = descriptions[i]
hashtag_tags = hashtag_tags_list[i]
fused_text = fused_texts[i]
tokens = base_tokens_list[i]
# Add matched phrases as tokens (if provided). Phrase lexicons are mined from descriptions only,
# so we match on cleaned description text (no hashtags). This runs only for rows that survive
# length + language filters (for speed).
if phrase_vocab and phrase_ngram_sizes and description:
desc_clean, _ = fuse_text(description, None, slang_map, cfg.text_cap)
desc_tokens = TOKENIZER_RE.findall(desc_clean)
matched: set[str] = set()
for n in phrase_ngram_sizes:
if n <= 1:
continue
for j in range(0, len(desc_tokens) - n + 1):
ph = " ".join(desc_tokens[j : j + n])
if ph in phrase_vocab:
matched.add(ph)
if matched:
tokens = tokens + list(matched)
texts.append(fused_text)
raw_scores = compute_raw_scores(tokens, lexicons, label_orders)
created_raw = first_present(row, "created_at", "create_time", "createTime", "collected_time", "stats_time")
created_date = None
if created_raw:
try:
created_int = int(created_raw)
created_date = pd.to_datetime(created_int, unit="s", utc=True).date().isoformat()
except Exception:
try:
created_date = str(pd.to_datetime(created_raw)).split(" ")[0]
except Exception:
created_date = None
# Derived time signals (time lexicon mining can be empty depending on inputs).
time_labels = label_orders.get("time", []) or []
if time_labels:
derived_time = compute_time_scores_derived(tokens, created_date, time_labels)
base_time = raw_scores.get("time") or [0.0] * len(time_labels)
if len(base_time) < len(time_labels):
base_time = list(base_time) + [0.0] * (len(time_labels) - len(base_time))
elif len(base_time) > len(time_labels):
base_time = list(base_time)[: len(time_labels)]
raw_scores["time"] = [float(max(a, b)) for a, b in zip(base_time, derived_time)]
video_id = coerce_id(first_present(row, "video_id", "aweme_id", "id"))
author_id = coerce_id(first_present(row, "author_id", "user_id"))
record: Dict[str, object] = {
"video_id": video_id,
"author_id": author_id,
"created_at": str(created_raw) if created_raw is not None else None,
"created_date": created_date,
"language": lang,
"description": description,
"hashtags": hashtag_tags,
"clean_text": fused_text,
"short_text": len(fused_text) < max(cfg.min_chars, 20),
"tribe_scores_raw": raw_scores.get("tribe", []),
"vibe_scores_raw": raw_scores.get("vibe", []),
"commercial_scores_raw": raw_scores.get("commercial", []),
"role_scores_raw": raw_scores.get("role", []),
"format_scores_raw": raw_scores.get("format", []),
"time_scores_raw": raw_scores.get("time", []),
}
if cfg.emit_flat_scores:
for dim, labels in label_orders.items():
scores = raw_scores.get(dim, []) or []
dim_max = float(max(scores)) if scores else 0.0
if dim == "time":
record["time_overall"] = dim_max
else:
record[f"{dim}_max"] = dim_max
for label, val in zip(labels, scores):
record[f"{dim}_{label}"] = float(val)
records.append(record)
if not records:
return pa.Table.from_pylist([]), filtered_lang, filtered_short
embeddings = embed_texts(embedder, texts, cfg.embedding_batch_size)
for rec, emb in zip(records, embeddings):
rec["embed_vec"] = emb
table = pa.Table.from_pylist(records)
return table, filtered_lang, filtered_short
def write_metadata(cfg: MinerConfig, label_orders: Dict[str, List[str]], diagnostics: Optional[Dict[str, int]] = None) -> None:
if not cfg.metadata_path:
return
meta = {
"config": asdict(cfg),
"label_orders": label_orders,
"diagnostics": diagnostics or {},
}
with open(cfg.metadata_path, "w", encoding="utf-8") as f:
json.dump(meta, f, ensure_ascii=False, indent=2)
logger.info("Wrote metadata to %s", cfg.metadata_path)
def run(cfg: MinerConfig) -> None:
logger.info("Starting sys7_miner with batch_size=%s", cfg.batch_size)
# Prefer System 7 lexicons if configured; fall back to System 6 bundle if needed.
lexicon_path = cfg.lexicons
if not Path(lexicon_path).exists() and Path("system6_lexicons.json").exists():
lexicon_path = "system6_lexicons.json"
logger.info("Lexicon bundle %s not found; falling back to %s", cfg.lexicons, lexicon_path)
else:
logger.info("Using lexicon bundle %s", lexicon_path)
raw_lexicons = load_json(str(lexicon_path))
slang_map = prepare_slang_map(load_json(cfg.slang_lexicon))
label_orders = load_json(cfg.label_orders)
lexicons = orient_lexicons(raw_lexicons)
phrase_vocab: Optional[set[str]] = None
phrase_ngram_sizes: List[int] = []
# Merge mined phrase lexicons into scoring (multi-word tokens).
if cfg.phrase_lexicon and Path(cfg.phrase_lexicon).exists():
try:
phrase_bundle = load_json(cfg.phrase_lexicon)
by_label = phrase_bundle.get("phrases_by_label")
if not isinstance(by_label, dict):
logger.warning(
"Phrase lexicon %s has no 'phrases_by_label'; it will not affect scoring. Re-run phraseminer.py to generate phrases_by_label.",
cfg.phrase_lexicon,
)
else:
phrase_vocab = set()
sizes = set()
merged = 0
for dim, label_map in by_label.items():
if dim not in label_orders:
continue
if not isinstance(label_map, dict):
continue
for label, phrases in label_map.items():
if label not in label_orders.get(dim, []):
continue
if not isinstance(phrases, dict):
continue
for phrase, w in phrases.items():
p = str(phrase).lower().strip()
if not p:
continue
try:
w_float = float(w) * float(cfg.phrase_weight_scale)
except Exception:
continue
lexicons.setdefault(dim, {}).setdefault(p, {})
lexicons[dim][p][label] = lexicons[dim][p].get(label, 0.0) + w_float
phrase_vocab.add(p)
sizes.add(len(p.split()))
merged += 1
phrase_ngram_sizes = sorted(n for n in sizes if n >= 2)
logger.info(
"Merged phrase lexicon into scoring: %s phrases (%s label assignments), ngram_sizes=%s",
len(phrase_vocab),
merged,
phrase_ngram_sizes,
)
except Exception as exc: # pragma: no cover - defensive
logger.warning("Failed to merge phrase lexicon into scoring: %s", exc)
embedder = load_embedder(cfg.embedding_model, cfg.device)
writer: Optional[pq.ParquetWriter] = None
total_rows = 0
kept_rows = 0
filtered_lang = 0
filtered_short = 0
path_obj = Path(cfg.input_path)
is_parquet = path_obj.is_dir() or path_obj.suffix.lower() == ".parquet"
def write_table(table: pa.Table):
nonlocal writer, kept_rows
if table.num_rows == 0:
return
if writer is None:
writer = pq.ParquetWriter(cfg.output_parquet, table.schema, compression="snappy")
writer.write_table(table)
kept_rows += table.num_rows
if is_parquet:
dataset = ds.dataset(cfg.input_path, format="parquet")
available = list(dataset.schema.names)
wanted = []
for c in (
"video_id",
"aweme_id",
"id",
"author_id",
"user_id",
"created_at",
"create_time",
"createTime",
"collected_time",
"stats_time",
"description",
"desc",
"hashtags",
"challenges",
"language",
"lang",
):
if c in available and c not in wanted:
wanted.append(c)
scanner = dataset.scanner(batch_size=cfg.batch_size, columns=wanted or None)
for idx, batch in enumerate(scanner.to_batches()):
df = batch.to_pandas()
total_rows += len(df)
table, fl, fs = process_chunk(df, lexicons, phrase_vocab, phrase_ngram_sizes, slang_map, label_orders, embedder, cfg)
filtered_lang += fl
filtered_short += fs
write_table(table)
logger.info("Processed parquet batch %s (input rows %s, kept total %s)", idx, len(df), kept_rows)
else:
header = pd.read_csv(cfg.input_path, nrows=0).columns.tolist()
wanted = []
for c in (
"video_id",
"aweme_id",
"id",
"author_id",
"user_id",
"created_at",
"create_time",
"createTime",
"collected_time",
"stats_time",
"description",
"desc",
"hashtags",
"challenges",
"language",
"lang",
):
if c in header and c not in wanted:
wanted.append(c)
with pd.read_csv(cfg.input_path, chunksize=cfg.batch_size, usecols=wanted or None, dtype=str, keep_default_na=False) as reader:
for idx, chunk in enumerate(reader):
total_rows += len(chunk)
table, fl, fs = process_chunk(chunk, lexicons, phrase_vocab, phrase_ngram_sizes, slang_map, label_orders, embedder, cfg)
filtered_lang += fl
filtered_short += fs
write_table(table)
logger.info("Processed CSV chunk %s (input rows %s, kept total %s)", idx, len(chunk), kept_rows)
if writer:
writer.close()
diagnostics = {
"input_rows": total_rows,
"kept_rows": kept_rows,
"filtered_language": filtered_lang,
"filtered_short": filtered_short,
}
write_metadata(cfg, label_orders, diagnostics)
if cfg.diagnostics_path:
Path(cfg.diagnostics_path).write_text(json.dumps(diagnostics, indent=2), encoding="utf-8")
logger.info(
"Completed. Input rows: %s, kept: %s, filtered_language: %s, filtered_short: %s",
total_rows,
kept_rows,
filtered_lang,
filtered_short,
)
def parse_args() -> MinerConfig:
parser = argparse.ArgumentParser(description="System 7 Miner: clean + embed + score raw TikTok text.")
parser.add_argument("--input-path", "--input", dest="input_path", required=True, help="Path to CSV or Parquet directory (e.g., Data/hf10m/Small)")
parser.add_argument("--output-parquet", default="tiktok10m_sys7_features.parquet", help="Output parquet path")
parser.add_argument(
"--lexicons",
default=None,
help="Preferred lexicon bundle (default: system7_lexicons.json if present, else system6_lexicons.json).",
)
parser.add_argument(
"--phrase-lexicon",
default=None,
help="Optional phrase lexicon JSON from phraseminer.py (uses phrases_by_label to score multi-word phrases).",
)
parser.add_argument(
"--phrase-weight-scale",
type=float,
default=1.0,
help="Scale factor applied to phrase lexicon weights when merged into scoring.",
)
parser.add_argument(
"--system6-lexicons",
default="system6_lexicons.json",
help="Fallback System 6 lexicon bundle (used if system7_lexicons.json is missing).",
)
parser.add_argument("--slang-lexicon", default="slang_lexicon.json", help="Path to slang_lexicon.json")
parser.add_argument("--label-orders", default="label_orders.json", help="Path to label_orders.json")
parser.add_argument("--batch-size", type=int, default=50_000)
parser.add_argument("--text-cap", type=int, default=1024)
parser.add_argument("--embedding-model", default="sentence-transformers/all-MiniLM-L6-v2")
parser.add_argument("--embedding-batch-size", type=int, default=128)
parser.add_argument("--device", default="cpu", help="Embedding device: cpu or cuda")
parser.add_argument(
"--workers",
type=int,
default=2,
help="Number of worker processes for CPU-bound language detection when --auto-language-detect is set (default: 2).",
)
parser.add_argument("--metadata-path", default="sys7_miner_metadata.json")
parser.add_argument("--diagnostics-path", default="sys7_miner_diagnostics.json")
parser.add_argument("--min-chars", type=int, default=10, help="Minimum clean_text length to keep")
parser.add_argument("--min-tokens", type=int, default=2, help="Minimum token count to keep")
parser.add_argument(
"--emit-flat-scores",
action="store_true",
help="If set, emit *_max and per-label scalar columns (e.g., tribe_MomTok) in addition to list score vectors.",
)
parser.add_argument(
"--language-allowlist",
default=None,
help="Comma-separated list of language codes to keep (e.g., en,fr). "
"Combined with optional --auto-language-detect to filter down to core languages.",
)
parser.add_argument(
"--auto-language-detect",
action="store_true",
help="If set, auto-detect language for rows with missing/unknown language using langdetect, when available.",
)
args = parser.parse_args()
lexicon_path = args.lexicons
if not lexicon_path:
# Default preference: system7_lexicons.json if present, else System 6 bundle.
default_sys7 = Path("system7_lexicons.json")
lexicon_path = str(default_sys7) if default_sys7.exists() else args.system6_lexicons
return MinerConfig(
input_path=args.input_path,
output_parquet=args.output_parquet,
lexicons=lexicon_path,
phrase_lexicon=args.phrase_lexicon,
phrase_weight_scale=float(args.phrase_weight_scale),
slang_lexicon=args.slang_lexicon,
label_orders=args.label_orders,
batch_size=args.batch_size,
text_cap=args.text_cap,
embedding_model=args.embedding_model,
embedding_batch_size=args.embedding_batch_size,
device=args.device,
workers=args.workers,
metadata_path=args.metadata_path,
diagnostics_path=args.diagnostics_path,
min_chars=args.min_chars,
min_tokens=args.min_tokens,
language_allowlist=[s.strip().lower() for s in args.language_allowlist.split(",")] if args.language_allowlist else None,
auto_language_detect=bool(args.auto_language_detect),
emit_flat_scores=bool(args.emit_flat_scores),
)
def main() -> None:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s - %(message)s",
)
cfg = parse_args()
run(cfg)
if __name__ == "__main__":
main()