Spaces:
Paused
Paused
| """ | |
| System 7 Miner | |
| --------------- | |
| Module 1: Clean TikTok rows, apply lexicon-derived raw signals, and emit feature vectors. | |
| Core outputs: | |
| - tiktok10m_sys7_features.parquet | |
| - optional sys7_miner_metadata.json | |
| This script is designed to stream large CSVs in chunks (no 10M-row load), | |
| normalize text with ftfy, compute raw lexical scores from System 6 lexicons, | |
| and generate contextual embeddings via sentence-transformers. | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import logging | |
| import math | |
| import os | |
| import re | |
| import time | |
| from collections import defaultdict | |
| from concurrent.futures import ProcessPoolExecutor | |
| from dataclasses import asdict, dataclass | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Sequence, Tuple | |
| import numpy as np | |
| import pandas as pd | |
| import pyarrow as pa | |
| import pyarrow.parquet as pq | |
| import pyarrow.dataset as ds | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| except ImportError: # pragma: no cover - handled at runtime | |
| SentenceTransformer = None # type: ignore | |
| try: | |
| import ftfy | |
| except ImportError: # pragma: no cover - handled at runtime | |
| ftfy = None # type: ignore | |
| try: | |
| from langdetect import detect as langdetect_detect | |
| from langdetect.detector_factory import DetectorFactory | |
| except ImportError: # pragma: no cover - optional dependency | |
| langdetect_detect = None # type: ignore | |
| else: # pragma: no cover - runtime behavior | |
| # Make langdetect deterministic across runs/processes. | |
| DetectorFactory.seed = 0 | |
| logger = logging.getLogger("sys7_miner") | |
| CONTROL_CHARS_RE = re.compile(r"[\u0000-\u0008\u000b\u000c\u000e-\u001f]") | |
| REPLACEMENT_RUN_RE = re.compile("�{2,}") | |
| WHITESPACE_RE = re.compile(r"\s+") | |
| HASHTAG_SPLIT_RE = re.compile(r"[A-Z]?[a-z]+|[0-9]+") | |
| TOKENIZER_RE = re.compile(r"[a-z0-9']+") | |
| TIME_ALNUM_RE = re.compile(r"[^a-z0-9]+") | |
| TIME_HOLIDAY_MONTH_MAP: Dict[str, int] = { | |
| "newyear": 1, | |
| "newyears": 1, | |
| "nye": 1, | |
| "valentine": 2, | |
| "valentines": 2, | |
| "stpatrick": 3, | |
| "easter": 4, | |
| "mothersday": 5, | |
| "memorial": 5, | |
| "juneteenth": 6, | |
| "pride": 6, | |
| "father": 6, | |
| "independence": 7, | |
| "july4": 7, | |
| "labor": 9, | |
| "halloween": 10, | |
| "thanksgiving": 11, | |
| "blackfriday": 11, | |
| "cybermonday": 11, | |
| "christmas": 12, | |
| "xmas": 12, | |
| "hanukkah": 12, | |
| } | |
| TIME_MONTH_KEYWORDS: Dict[str, int] = { | |
| "january": 1, | |
| "jan": 1, | |
| "february": 2, | |
| "feb": 2, | |
| "march": 3, | |
| "mar": 3, | |
| "april": 4, | |
| "apr": 4, | |
| "may": 5, | |
| "june": 6, | |
| "jun": 6, | |
| "july": 7, | |
| "jul": 7, | |
| "august": 8, | |
| "aug": 8, | |
| "september": 9, | |
| "sept": 9, | |
| "sep": 9, | |
| "october": 10, | |
| "oct": 10, | |
| "november": 11, | |
| "nov": 11, | |
| "december": 12, | |
| "dec": 12, | |
| } | |
| TIME_SEASON_TERMS = { | |
| "spring", | |
| "summer", | |
| "fall", | |
| "autumn", | |
| "winter", | |
| "backtoschool", | |
| "graduation", | |
| } | |
| TIME_VIRAL_TOKENS = { | |
| "fyp", | |
| "foryou", | |
| "foryoupage", | |
| "viral", | |
| "trending", | |
| "trend", | |
| "xyzbca", | |
| } | |
| def _time_normalize_token(token: str) -> str: | |
| return TIME_ALNUM_RE.sub("", (token or "").lower()) | |
| def _time_parse_created_month(created_date: Optional[str]) -> Optional[int]: | |
| if not created_date: | |
| return None | |
| try: | |
| parts = str(created_date).split("T", 1)[0].split(" ", 1)[0].split("-", 2) | |
| if len(parts) >= 2: | |
| m = int(parts[1]) | |
| if 1 <= m <= 12: | |
| return m | |
| except Exception: | |
| return None | |
| return None | |
| def _time_detect_month_from_token(token: str) -> Optional[int]: | |
| token = _time_normalize_token(token) | |
| if not token: | |
| return None | |
| if token in TIME_MONTH_KEYWORDS: | |
| return TIME_MONTH_KEYWORDS[token] | |
| for holiday, month in TIME_HOLIDAY_MONTH_MAP.items(): | |
| if holiday in token: | |
| return month | |
| return None | |
| def _time_squash_hits(hits: int, *, base: float = 0.35, step: float = 0.15) -> float: | |
| if hits <= 0: | |
| return 0.0 | |
| return float(min(1.0, base + step * (hits - 1))) | |
| def compute_time_scores_derived(tokens: Sequence[str], created_date: Optional[str], label_order: Sequence[str]) -> List[float]: | |
| seasonal_hits = 0 | |
| viral_hits = 0 | |
| token_month_hits: Dict[int, int] = {} | |
| for tok in tokens or []: | |
| norm = _time_normalize_token(str(tok)) | |
| if not norm: | |
| continue | |
| if norm in TIME_VIRAL_TOKENS: | |
| viral_hits += 1 | |
| if norm in TIME_SEASON_TERMS: | |
| seasonal_hits += 1 | |
| m = _time_detect_month_from_token(norm) | |
| if m is not None: | |
| seasonal_hits += 1 | |
| token_month_hits[m] = token_month_hits.get(m, 0) + 1 | |
| seasonal = _time_squash_hits(seasonal_hits) | |
| viral = _time_squash_hits(viral_hits) | |
| created_month = _time_parse_created_month(created_date) | |
| if created_month and token_month_hits: | |
| dominant_month = max(token_month_hits.items(), key=lambda kv: kv[1])[0] | |
| if dominant_month == created_month and seasonal > 0: | |
| seasonal = float(min(1.0, seasonal + 0.05)) | |
| by_label = {"seasonal": seasonal, "viral": viral} | |
| return [float(by_label.get(label, 0.0)) for label in label_order] | |
| class MinerConfig: | |
| input_path: str | |
| output_parquet: str = "tiktok10m_sys7_features.parquet" | |
| lexicons: str = "system7_lexicons.json" | |
| phrase_lexicon: Optional[str] = None | |
| phrase_weight_scale: float = 1.0 | |
| slang_lexicon: str = "slang_lexicon.json" | |
| label_orders: str = "label_orders.json" | |
| batch_size: int = 50_000 | |
| text_cap: int = 1024 | |
| embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2" | |
| embedding_batch_size: int = 128 | |
| device: str = "cpu" | |
| workers: int = 2 | |
| metadata_path: Optional[str] = "sys7_miner_metadata.json" | |
| diagnostics_path: Optional[str] = "sys7_miner_diagnostics.json" | |
| min_chars: int = 10 | |
| min_tokens: int = 2 | |
| language_allowlist: Optional[List[str]] = None # e.g., ["en"] | |
| auto_language_detect: bool = False | |
| emit_flat_scores: bool = False | |
| def load_json(path: str) -> Dict: | |
| with open(path, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| def auto_detect_language(text: str) -> Optional[str]: | |
| """Best-effort language detection using langdetect, if available.""" | |
| if not langdetect_detect: | |
| return None | |
| if not text: | |
| return None | |
| # langdetect cost scales with text length; captions + hashtags can be long/noisy. | |
| # Truncate to keep detection cheap and stable. | |
| if len(text) > 400: | |
| text = text[:400] | |
| try: | |
| return str(langdetect_detect(text)) | |
| except Exception: | |
| return None | |
| def detect_languages_parallel(texts: List[str], workers: int) -> List[Optional[str]]: | |
| """Detect languages for many texts, optionally using multiprocessing. | |
| langdetect is CPU-bound; using a ProcessPool sidesteps the GIL. | |
| """ | |
| if not texts: | |
| return [] | |
| workers = int(workers or 0) | |
| if workers <= 1 or not langdetect_detect: | |
| return [auto_detect_language(t) for t in texts] | |
| # Clamp to avoid spawning an excessive number of processes. | |
| max_workers = min(workers, os.cpu_count() or workers) | |
| chunksize = 256 # critical for many short strings | |
| with ProcessPoolExecutor(max_workers=max_workers) as ex: | |
| return list(ex.map(auto_detect_language, texts, chunksize=chunksize)) | |
| def is_nullish(val) -> bool: | |
| if val is None: | |
| return True | |
| if isinstance(val, float) and math.isnan(val): | |
| return True | |
| if isinstance(val, str) and val.strip() == "": | |
| return True | |
| return False | |
| def coerce_id(val) -> Optional[str]: | |
| """Coerce IDs to strings (safe for very large integers).""" | |
| if is_nullish(val): | |
| return None | |
| if isinstance(val, (int, np.integer)): | |
| return str(int(val)) | |
| if isinstance(val, (float, np.floating)): | |
| if math.isnan(float(val)): | |
| return None | |
| if float(val).is_integer(): | |
| return str(int(val)) | |
| return str(val) | |
| s = str(val).strip() | |
| if s.endswith(".0"): | |
| try: | |
| as_int = int(float(s)) | |
| return str(as_int) | |
| except Exception: | |
| pass | |
| return s | |
| def first_present(row, *names: str): | |
| for name in names: | |
| if hasattr(row, name): | |
| val = getattr(row, name, None) | |
| if not is_nullish(val): | |
| return val | |
| return None | |
| def extract_hashtag_tags(raw) -> List[str]: | |
| """ | |
| Extract a list of hashtag/challenge strings from a variety of formats: | |
| - JSON string list: '["tag1","tag2"]' | |
| - JSON string list of objects: '[{"title":"tag"}]' | |
| - whitespace/comma separated string | |
| - list/tuple of strings (or dicts) | |
| """ | |
| if is_nullish(raw): | |
| return [] | |
| if isinstance(raw, (list, tuple)): | |
| out: List[str] = [] | |
| for item in raw: | |
| if is_nullish(item): | |
| continue | |
| if isinstance(item, str): | |
| out.append(item.strip()) | |
| continue | |
| if isinstance(item, dict): | |
| for k in ("title", "name", "tag", "challenge", "challenge_name", "text"): | |
| v = item.get(k) | |
| if isinstance(v, str) and v.strip(): | |
| out.append(v.strip()) | |
| break | |
| return [t for t in out if t] | |
| if isinstance(raw, str): | |
| s = raw.strip() | |
| if not s: | |
| return [] | |
| if s[:1] in ("[", "{"): | |
| try: | |
| obj = json.loads(s) | |
| if isinstance(obj, list): | |
| return extract_hashtag_tags(obj) | |
| if isinstance(obj, dict): | |
| for k in ("hashtags", "challenges", "tags"): | |
| v = obj.get(k) | |
| if v is not None: | |
| return extract_hashtag_tags(v) | |
| except Exception: | |
| pass | |
| # Fallback: find #tags, else split on commas/whitespace | |
| tags = re.findall(r"#([A-Za-z0-9_]+)", s) | |
| if tags: | |
| return tags | |
| parts = [p.strip().strip('"').strip("'").strip("[](){}") for p in re.split(r"[,\s]+", s) if p.strip()] | |
| return [p.lstrip("#") for p in parts if p and p != "#"] | |
| return [] | |
| def prepare_slang_map(raw) -> Dict[str, str]: | |
| """ | |
| Normalize slang lexicon to token -> canonical mapping. | |
| Accepts dict (already mapping) or list of objects with 'token' and optional 'canonical' keys. | |
| """ | |
| mapping: Dict[str, str] = {} | |
| if isinstance(raw, dict): | |
| for k, v in raw.items(): | |
| mapping[str(k).lower()] = str(v) if v is not None else str(k).lower() | |
| return mapping | |
| if isinstance(raw, list): | |
| for item in raw: | |
| if not isinstance(item, dict): | |
| continue | |
| token = item.get("token") or item.get("slang") or item.get("term") | |
| canonical = item.get("canonical") or item.get("value") or token | |
| if token: | |
| mapping[str(token).lower()] = str(canonical).lower() | |
| return mapping | |
| def orient_lexicons(raw_lexicons: Dict[str, Dict]) -> Dict[str, Dict[str, Dict[str, float]]]: | |
| """ | |
| Normalize lexicons into dim -> token -> {label: weight}. | |
| Accepts lexicons shaped as label -> token -> weight or weight objects. | |
| Ignores non-dict top-level entries (e.g., config/sources). | |
| """ | |
| def to_weight(val): | |
| if isinstance(val, dict) and "weight" in val: | |
| return val["weight"] | |
| return val | |
| oriented: Dict[str, Dict[str, Dict[str, float]]] = {} | |
| for dim, labels in raw_lexicons.items(): | |
| if not isinstance(labels, dict): | |
| continue # skip config/metadata blocks | |
| token_map: Dict[str, Dict[str, float]] = defaultdict(dict) | |
| for label, token_weights in labels.items(): | |
| if not isinstance(token_weights, dict): | |
| continue | |
| for token, weight in token_weights.items(): | |
| w = to_weight(weight) | |
| try: | |
| w_float = float(w) | |
| except (TypeError, ValueError): | |
| continue | |
| token_map[token.lower()][label] = w_float | |
| oriented[dim] = token_map | |
| return oriented | |
| def standardize_quotes(text: str) -> str: | |
| return ( | |
| text.replace("“", '"') | |
| .replace("”", '"') | |
| .replace("‘", "'") | |
| .replace("’", "'") | |
| .replace("´", "'") | |
| ) | |
| def split_hashtag(tag: str) -> List[str]: | |
| parts = HASHTAG_SPLIT_RE.findall(tag) | |
| if parts: | |
| return [p.lower() for p in parts] | |
| return [tag.lower()] | |
| def normalize_hashtags(raw: Optional[str], slang_map: Dict[str, str]) -> List[str]: | |
| if raw is None or (isinstance(raw, float) and math.isnan(raw)): | |
| return [] | |
| tokens: List[str] = [] | |
| if isinstance(raw, str): | |
| raw_parts = re.split(r"[,\s]+", raw) | |
| elif isinstance(raw, (list, tuple)): | |
| raw_parts = raw | |
| else: | |
| return tokens | |
| for part in raw_parts: | |
| if not part: | |
| continue | |
| cleaned = part.lstrip("#").strip() | |
| if not cleaned: | |
| continue | |
| for token in split_hashtag(cleaned): | |
| mapped = slang_map.get(token, token) | |
| if mapped: | |
| tokens.append(mapped) | |
| return tokens | |
| def fuse_text(description: Optional[str], hashtags: Optional[str | Sequence[str]], slang_map: Dict[str, str], cap: int) -> Tuple[str, List[str]]: | |
| desc = "" if description is None or (isinstance(description, float) and math.isnan(description)) else str(description) | |
| ht_tokens = normalize_hashtags(hashtags, slang_map) | |
| hashtag_text = " ".join(ht_tokens) | |
| fused = " ".join(filter(None, [desc, hashtag_text])).lower() | |
| # if ftfy: | |
| # fused = ftfy.fix_text(fused, normalization="NFC") | |
| fused = CONTROL_CHARS_RE.sub(" ", fused) | |
| fused = REPLACEMENT_RUN_RE.sub(" ", fused) | |
| fused = standardize_quotes(fused) | |
| fused = WHITESPACE_RE.sub(" ", fused).strip() | |
| if cap and len(fused) > cap: | |
| fused = fused[:cap] | |
| return fused, ht_tokens | |
| def tokenize(text: str, hashtag_tokens: Sequence[str]) -> List[str]: | |
| base_tokens = TOKENIZER_RE.findall(text) | |
| tokens = [t for t in base_tokens if t] | |
| tokens.extend([t.lower() for t in hashtag_tokens if t]) | |
| return tokens | |
| def compute_raw_scores(tokens: Sequence[str], lexicons: Dict[str, Dict[str, Dict[str, float]]], label_orders: Dict[str, List[str]]) -> Dict[str, List[float]]: | |
| scores: Dict[str, Dict[str, float]] = {dim: defaultdict(float) for dim in label_orders} | |
| for token in tokens: | |
| for dim, token_map in lexicons.items(): | |
| if token not in token_map: | |
| continue | |
| for label, weight in token_map[token].items(): | |
| scores[dim][label] += weight | |
| norm = max(1.0, math.log1p(len(tokens))) | |
| vectorized: Dict[str, List[float]] = {} | |
| for dim, order in label_orders.items(): | |
| vectorized[dim] = [scores[dim].get(label, 0.0) / norm for label in order] | |
| return vectorized | |
| def load_embedder(model_name: str, device: str) -> SentenceTransformer: | |
| if SentenceTransformer is None: | |
| raise ImportError("sentence-transformers is required. Install via `pip install sentence-transformers`.") | |
| logger.info("Loading embedding model %s on device=%s", model_name, device) | |
| return SentenceTransformer(model_name, device=device) | |
| def embed_texts(model: SentenceTransformer, texts: List[str], batch_size: int) -> List[List[float]]: | |
| embeddings = model.encode( | |
| texts, | |
| batch_size=batch_size, | |
| show_progress_bar=False, | |
| convert_to_numpy=True, | |
| ) | |
| return embeddings.tolist() | |
| def process_chunk( | |
| df: pd.DataFrame, | |
| lexicons: Dict[str, Dict[str, Dict[str, float]]], | |
| phrase_vocab: Optional[set[str]], | |
| phrase_ngram_sizes: Sequence[int], | |
| slang_map: Dict[str, str], | |
| label_orders: Dict[str, List[str]], | |
| embedder: SentenceTransformer, | |
| cfg: MinerConfig, | |
| ) -> Tuple[pa.Table, int, int]: | |
| # First pass: cheap per-row work + min-length filter. | |
| rows: List[object] = [] | |
| descriptions: List[Optional[str]] = [] | |
| hashtag_tags_list: List[List[str]] = [] | |
| fused_texts: List[str] = [] | |
| base_tokens_list: List[List[str]] = [] | |
| langs: List[Optional[str]] = [] | |
| needs_detect: List[bool] = [] | |
| filtered_lang = 0 | |
| filtered_short = 0 | |
| for row in df.itertuples(index=False): | |
| description_val = first_present(row, "description", "desc") | |
| hashtags_raw = first_present(row, "hashtags", "challenges") | |
| hashtag_tags = extract_hashtag_tags(hashtags_raw) | |
| description = str(description_val) if description_val is not None else None | |
| fused_text, ht_tokens = fuse_text(description, hashtag_tags, slang_map, cfg.text_cap) | |
| base_tokens = tokenize(fused_text, ht_tokens) | |
| if len(fused_text) < cfg.min_chars or len(base_tokens) < cfg.min_tokens: | |
| filtered_short += 1 | |
| continue | |
| lang_val = getattr(row, "language", None) or getattr(row, "lang", None) | |
| lang = None if is_nullish(lang_val) else str(lang_val).strip() | |
| lang_norm = lang.lower() if lang else "" | |
| need = bool(cfg.auto_language_detect and langdetect_detect and (not lang_norm or lang_norm in ("und", "unknown"))) | |
| rows.append(row) | |
| descriptions.append(description) | |
| hashtag_tags_list.append(hashtag_tags) | |
| fused_texts.append(fused_text) | |
| base_tokens_list.append(base_tokens) | |
| langs.append(lang) | |
| needs_detect.append(need) | |
| # Multi-process language detection (CPU-bound). | |
| if any(needs_detect): | |
| detect_indices = [i for i, need in enumerate(needs_detect) if need] | |
| detect_texts = [(descriptions[i] or "") for i in detect_indices] | |
| t0 = time.perf_counter() | |
| detected_langs = detect_languages_parallel(detect_texts, cfg.workers) | |
| dt = time.perf_counter() - t0 | |
| used_workers = min(max(int(cfg.workers or 0), 1), os.cpu_count() or max(int(cfg.workers or 0), 1)) | |
| logger.info( | |
| "Langdetect: detected %s rows in %.2fs using workers=%s", | |
| len(detect_texts), | |
| dt, | |
| used_workers, | |
| ) | |
| for idx, detected in zip(detect_indices, detected_langs): | |
| if detected: | |
| langs[idx] = detected | |
| # Second pass: apply allowlist + phrase matching + scoring + record building. | |
| records: List[Dict[str, object]] = [] | |
| texts: List[str] = [] | |
| for i, row in enumerate(rows): | |
| lang = langs[i] | |
| if cfg.language_allowlist: | |
| if not lang: | |
| filtered_lang += 1 | |
| continue | |
| if str(lang).lower() not in cfg.language_allowlist: | |
| filtered_lang += 1 | |
| continue | |
| description = descriptions[i] | |
| hashtag_tags = hashtag_tags_list[i] | |
| fused_text = fused_texts[i] | |
| tokens = base_tokens_list[i] | |
| # Add matched phrases as tokens (if provided). Phrase lexicons are mined from descriptions only, | |
| # so we match on cleaned description text (no hashtags). This runs only for rows that survive | |
| # length + language filters (for speed). | |
| if phrase_vocab and phrase_ngram_sizes and description: | |
| desc_clean, _ = fuse_text(description, None, slang_map, cfg.text_cap) | |
| desc_tokens = TOKENIZER_RE.findall(desc_clean) | |
| matched: set[str] = set() | |
| for n in phrase_ngram_sizes: | |
| if n <= 1: | |
| continue | |
| for j in range(0, len(desc_tokens) - n + 1): | |
| ph = " ".join(desc_tokens[j : j + n]) | |
| if ph in phrase_vocab: | |
| matched.add(ph) | |
| if matched: | |
| tokens = tokens + list(matched) | |
| texts.append(fused_text) | |
| raw_scores = compute_raw_scores(tokens, lexicons, label_orders) | |
| created_raw = first_present(row, "created_at", "create_time", "createTime", "collected_time", "stats_time") | |
| created_date = None | |
| if created_raw: | |
| try: | |
| created_int = int(created_raw) | |
| created_date = pd.to_datetime(created_int, unit="s", utc=True).date().isoformat() | |
| except Exception: | |
| try: | |
| created_date = str(pd.to_datetime(created_raw)).split(" ")[0] | |
| except Exception: | |
| created_date = None | |
| # Derived time signals (time lexicon mining can be empty depending on inputs). | |
| time_labels = label_orders.get("time", []) or [] | |
| if time_labels: | |
| derived_time = compute_time_scores_derived(tokens, created_date, time_labels) | |
| base_time = raw_scores.get("time") or [0.0] * len(time_labels) | |
| if len(base_time) < len(time_labels): | |
| base_time = list(base_time) + [0.0] * (len(time_labels) - len(base_time)) | |
| elif len(base_time) > len(time_labels): | |
| base_time = list(base_time)[: len(time_labels)] | |
| raw_scores["time"] = [float(max(a, b)) for a, b in zip(base_time, derived_time)] | |
| video_id = coerce_id(first_present(row, "video_id", "aweme_id", "id")) | |
| author_id = coerce_id(first_present(row, "author_id", "user_id")) | |
| record: Dict[str, object] = { | |
| "video_id": video_id, | |
| "author_id": author_id, | |
| "created_at": str(created_raw) if created_raw is not None else None, | |
| "created_date": created_date, | |
| "language": lang, | |
| "description": description, | |
| "hashtags": hashtag_tags, | |
| "clean_text": fused_text, | |
| "short_text": len(fused_text) < max(cfg.min_chars, 20), | |
| "tribe_scores_raw": raw_scores.get("tribe", []), | |
| "vibe_scores_raw": raw_scores.get("vibe", []), | |
| "commercial_scores_raw": raw_scores.get("commercial", []), | |
| "role_scores_raw": raw_scores.get("role", []), | |
| "format_scores_raw": raw_scores.get("format", []), | |
| "time_scores_raw": raw_scores.get("time", []), | |
| } | |
| if cfg.emit_flat_scores: | |
| for dim, labels in label_orders.items(): | |
| scores = raw_scores.get(dim, []) or [] | |
| dim_max = float(max(scores)) if scores else 0.0 | |
| if dim == "time": | |
| record["time_overall"] = dim_max | |
| else: | |
| record[f"{dim}_max"] = dim_max | |
| for label, val in zip(labels, scores): | |
| record[f"{dim}_{label}"] = float(val) | |
| records.append(record) | |
| if not records: | |
| return pa.Table.from_pylist([]), filtered_lang, filtered_short | |
| embeddings = embed_texts(embedder, texts, cfg.embedding_batch_size) | |
| for rec, emb in zip(records, embeddings): | |
| rec["embed_vec"] = emb | |
| table = pa.Table.from_pylist(records) | |
| return table, filtered_lang, filtered_short | |
| def write_metadata(cfg: MinerConfig, label_orders: Dict[str, List[str]], diagnostics: Optional[Dict[str, int]] = None) -> None: | |
| if not cfg.metadata_path: | |
| return | |
| meta = { | |
| "config": asdict(cfg), | |
| "label_orders": label_orders, | |
| "diagnostics": diagnostics or {}, | |
| } | |
| with open(cfg.metadata_path, "w", encoding="utf-8") as f: | |
| json.dump(meta, f, ensure_ascii=False, indent=2) | |
| logger.info("Wrote metadata to %s", cfg.metadata_path) | |
| def run(cfg: MinerConfig) -> None: | |
| logger.info("Starting sys7_miner with batch_size=%s", cfg.batch_size) | |
| # Prefer System 7 lexicons if configured; fall back to System 6 bundle if needed. | |
| lexicon_path = cfg.lexicons | |
| if not Path(lexicon_path).exists() and Path("system6_lexicons.json").exists(): | |
| lexicon_path = "system6_lexicons.json" | |
| logger.info("Lexicon bundle %s not found; falling back to %s", cfg.lexicons, lexicon_path) | |
| else: | |
| logger.info("Using lexicon bundle %s", lexicon_path) | |
| raw_lexicons = load_json(str(lexicon_path)) | |
| slang_map = prepare_slang_map(load_json(cfg.slang_lexicon)) | |
| label_orders = load_json(cfg.label_orders) | |
| lexicons = orient_lexicons(raw_lexicons) | |
| phrase_vocab: Optional[set[str]] = None | |
| phrase_ngram_sizes: List[int] = [] | |
| # Merge mined phrase lexicons into scoring (multi-word tokens). | |
| if cfg.phrase_lexicon and Path(cfg.phrase_lexicon).exists(): | |
| try: | |
| phrase_bundle = load_json(cfg.phrase_lexicon) | |
| by_label = phrase_bundle.get("phrases_by_label") | |
| if not isinstance(by_label, dict): | |
| logger.warning( | |
| "Phrase lexicon %s has no 'phrases_by_label'; it will not affect scoring. Re-run phraseminer.py to generate phrases_by_label.", | |
| cfg.phrase_lexicon, | |
| ) | |
| else: | |
| phrase_vocab = set() | |
| sizes = set() | |
| merged = 0 | |
| for dim, label_map in by_label.items(): | |
| if dim not in label_orders: | |
| continue | |
| if not isinstance(label_map, dict): | |
| continue | |
| for label, phrases in label_map.items(): | |
| if label not in label_orders.get(dim, []): | |
| continue | |
| if not isinstance(phrases, dict): | |
| continue | |
| for phrase, w in phrases.items(): | |
| p = str(phrase).lower().strip() | |
| if not p: | |
| continue | |
| try: | |
| w_float = float(w) * float(cfg.phrase_weight_scale) | |
| except Exception: | |
| continue | |
| lexicons.setdefault(dim, {}).setdefault(p, {}) | |
| lexicons[dim][p][label] = lexicons[dim][p].get(label, 0.0) + w_float | |
| phrase_vocab.add(p) | |
| sizes.add(len(p.split())) | |
| merged += 1 | |
| phrase_ngram_sizes = sorted(n for n in sizes if n >= 2) | |
| logger.info( | |
| "Merged phrase lexicon into scoring: %s phrases (%s label assignments), ngram_sizes=%s", | |
| len(phrase_vocab), | |
| merged, | |
| phrase_ngram_sizes, | |
| ) | |
| except Exception as exc: # pragma: no cover - defensive | |
| logger.warning("Failed to merge phrase lexicon into scoring: %s", exc) | |
| embedder = load_embedder(cfg.embedding_model, cfg.device) | |
| writer: Optional[pq.ParquetWriter] = None | |
| total_rows = 0 | |
| kept_rows = 0 | |
| filtered_lang = 0 | |
| filtered_short = 0 | |
| path_obj = Path(cfg.input_path) | |
| is_parquet = path_obj.is_dir() or path_obj.suffix.lower() == ".parquet" | |
| def write_table(table: pa.Table): | |
| nonlocal writer, kept_rows | |
| if table.num_rows == 0: | |
| return | |
| if writer is None: | |
| writer = pq.ParquetWriter(cfg.output_parquet, table.schema, compression="snappy") | |
| writer.write_table(table) | |
| kept_rows += table.num_rows | |
| if is_parquet: | |
| dataset = ds.dataset(cfg.input_path, format="parquet") | |
| available = list(dataset.schema.names) | |
| wanted = [] | |
| for c in ( | |
| "video_id", | |
| "aweme_id", | |
| "id", | |
| "author_id", | |
| "user_id", | |
| "created_at", | |
| "create_time", | |
| "createTime", | |
| "collected_time", | |
| "stats_time", | |
| "description", | |
| "desc", | |
| "hashtags", | |
| "challenges", | |
| "language", | |
| "lang", | |
| ): | |
| if c in available and c not in wanted: | |
| wanted.append(c) | |
| scanner = dataset.scanner(batch_size=cfg.batch_size, columns=wanted or None) | |
| for idx, batch in enumerate(scanner.to_batches()): | |
| df = batch.to_pandas() | |
| total_rows += len(df) | |
| table, fl, fs = process_chunk(df, lexicons, phrase_vocab, phrase_ngram_sizes, slang_map, label_orders, embedder, cfg) | |
| filtered_lang += fl | |
| filtered_short += fs | |
| write_table(table) | |
| logger.info("Processed parquet batch %s (input rows %s, kept total %s)", idx, len(df), kept_rows) | |
| else: | |
| header = pd.read_csv(cfg.input_path, nrows=0).columns.tolist() | |
| wanted = [] | |
| for c in ( | |
| "video_id", | |
| "aweme_id", | |
| "id", | |
| "author_id", | |
| "user_id", | |
| "created_at", | |
| "create_time", | |
| "createTime", | |
| "collected_time", | |
| "stats_time", | |
| "description", | |
| "desc", | |
| "hashtags", | |
| "challenges", | |
| "language", | |
| "lang", | |
| ): | |
| if c in header and c not in wanted: | |
| wanted.append(c) | |
| with pd.read_csv(cfg.input_path, chunksize=cfg.batch_size, usecols=wanted or None, dtype=str, keep_default_na=False) as reader: | |
| for idx, chunk in enumerate(reader): | |
| total_rows += len(chunk) | |
| table, fl, fs = process_chunk(chunk, lexicons, phrase_vocab, phrase_ngram_sizes, slang_map, label_orders, embedder, cfg) | |
| filtered_lang += fl | |
| filtered_short += fs | |
| write_table(table) | |
| logger.info("Processed CSV chunk %s (input rows %s, kept total %s)", idx, len(chunk), kept_rows) | |
| if writer: | |
| writer.close() | |
| diagnostics = { | |
| "input_rows": total_rows, | |
| "kept_rows": kept_rows, | |
| "filtered_language": filtered_lang, | |
| "filtered_short": filtered_short, | |
| } | |
| write_metadata(cfg, label_orders, diagnostics) | |
| if cfg.diagnostics_path: | |
| Path(cfg.diagnostics_path).write_text(json.dumps(diagnostics, indent=2), encoding="utf-8") | |
| logger.info( | |
| "Completed. Input rows: %s, kept: %s, filtered_language: %s, filtered_short: %s", | |
| total_rows, | |
| kept_rows, | |
| filtered_lang, | |
| filtered_short, | |
| ) | |
| def parse_args() -> MinerConfig: | |
| parser = argparse.ArgumentParser(description="System 7 Miner: clean + embed + score raw TikTok text.") | |
| parser.add_argument("--input-path", "--input", dest="input_path", required=True, help="Path to CSV or Parquet directory (e.g., Data/hf10m/Small)") | |
| parser.add_argument("--output-parquet", default="tiktok10m_sys7_features.parquet", help="Output parquet path") | |
| parser.add_argument( | |
| "--lexicons", | |
| default=None, | |
| help="Preferred lexicon bundle (default: system7_lexicons.json if present, else system6_lexicons.json).", | |
| ) | |
| parser.add_argument( | |
| "--phrase-lexicon", | |
| default=None, | |
| help="Optional phrase lexicon JSON from phraseminer.py (uses phrases_by_label to score multi-word phrases).", | |
| ) | |
| parser.add_argument( | |
| "--phrase-weight-scale", | |
| type=float, | |
| default=1.0, | |
| help="Scale factor applied to phrase lexicon weights when merged into scoring.", | |
| ) | |
| parser.add_argument( | |
| "--system6-lexicons", | |
| default="system6_lexicons.json", | |
| help="Fallback System 6 lexicon bundle (used if system7_lexicons.json is missing).", | |
| ) | |
| parser.add_argument("--slang-lexicon", default="slang_lexicon.json", help="Path to slang_lexicon.json") | |
| parser.add_argument("--label-orders", default="label_orders.json", help="Path to label_orders.json") | |
| parser.add_argument("--batch-size", type=int, default=50_000) | |
| parser.add_argument("--text-cap", type=int, default=1024) | |
| parser.add_argument("--embedding-model", default="sentence-transformers/all-MiniLM-L6-v2") | |
| parser.add_argument("--embedding-batch-size", type=int, default=128) | |
| parser.add_argument("--device", default="cpu", help="Embedding device: cpu or cuda") | |
| parser.add_argument( | |
| "--workers", | |
| type=int, | |
| default=2, | |
| help="Number of worker processes for CPU-bound language detection when --auto-language-detect is set (default: 2).", | |
| ) | |
| parser.add_argument("--metadata-path", default="sys7_miner_metadata.json") | |
| parser.add_argument("--diagnostics-path", default="sys7_miner_diagnostics.json") | |
| parser.add_argument("--min-chars", type=int, default=10, help="Minimum clean_text length to keep") | |
| parser.add_argument("--min-tokens", type=int, default=2, help="Minimum token count to keep") | |
| parser.add_argument( | |
| "--emit-flat-scores", | |
| action="store_true", | |
| help="If set, emit *_max and per-label scalar columns (e.g., tribe_MomTok) in addition to list score vectors.", | |
| ) | |
| parser.add_argument( | |
| "--language-allowlist", | |
| default=None, | |
| help="Comma-separated list of language codes to keep (e.g., en,fr). " | |
| "Combined with optional --auto-language-detect to filter down to core languages.", | |
| ) | |
| parser.add_argument( | |
| "--auto-language-detect", | |
| action="store_true", | |
| help="If set, auto-detect language for rows with missing/unknown language using langdetect, when available.", | |
| ) | |
| args = parser.parse_args() | |
| lexicon_path = args.lexicons | |
| if not lexicon_path: | |
| # Default preference: system7_lexicons.json if present, else System 6 bundle. | |
| default_sys7 = Path("system7_lexicons.json") | |
| lexicon_path = str(default_sys7) if default_sys7.exists() else args.system6_lexicons | |
| return MinerConfig( | |
| input_path=args.input_path, | |
| output_parquet=args.output_parquet, | |
| lexicons=lexicon_path, | |
| phrase_lexicon=args.phrase_lexicon, | |
| phrase_weight_scale=float(args.phrase_weight_scale), | |
| slang_lexicon=args.slang_lexicon, | |
| label_orders=args.label_orders, | |
| batch_size=args.batch_size, | |
| text_cap=args.text_cap, | |
| embedding_model=args.embedding_model, | |
| embedding_batch_size=args.embedding_batch_size, | |
| device=args.device, | |
| workers=args.workers, | |
| metadata_path=args.metadata_path, | |
| diagnostics_path=args.diagnostics_path, | |
| min_chars=args.min_chars, | |
| min_tokens=args.min_tokens, | |
| language_allowlist=[s.strip().lower() for s in args.language_allowlist.split(",")] if args.language_allowlist else None, | |
| auto_language_detect=bool(args.auto_language_detect), | |
| emit_flat_scores=bool(args.emit_flat_scores), | |
| ) | |
| def main() -> None: | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s %(levelname)s %(name)s - %(message)s", | |
| ) | |
| cfg = parse_args() | |
| run(cfg) | |
| if __name__ == "__main__": | |
| main() | |