LordofMonarchs's picture
Upload folder using huggingface_hub
c754148 verified
Raw
History Blame Contribute Delete
27.7 kB
from __future__ import annotations
import argparse
import json
import logging
import math
import os
import pickle
import sys
import time
from datetime import datetime
from typing import Dict, List, Optional, Tuple
_SRC_DIR = os.path.dirname(os.path.abspath(__file__))
_PROJECT_ROOT = os.path.dirname(_SRC_DIR)
_SCRIPTS_DIR = os.path.join(_PROJECT_ROOT, "scripts")
for _p in [_SRC_DIR, _SCRIPTS_DIR, _PROJECT_ROOT]:
if _p not in sys.path:
sys.path.insert(0, _p)
import numpy as np
import pandas as pd
from rank_bm25 import BM25Okapi
def setup_logging(base_dir: str) -> logging.Logger:
"""Set up file + console logging."""
logs_dir = os.path.join(base_dir, "logs")
os.makedirs(logs_dir, exist_ok=True)
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
log_file = os.path.join(logs_dir, f"rank_{timestamp}.log")
logger = logging.getLogger("rank")
logger.setLevel(logging.DEBUG)
fh = logging.FileHandler(log_file, encoding="utf-8")
fh.setLevel(logging.DEBUG)
fh.setFormatter(logging.Formatter(
"%(asctime)s %(levelname)s [%(name)s] %(message)s",
datefmt="%H:%M:%S"
))
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.INFO)
ch.setFormatter(logging.Formatter(
"%(asctime)s %(levelname)s %(message)s",
datefmt="%H:%M:%S"
))
logger.addHandler(fh)
logger.addHandler(ch)
logger.info("Log file: %s", log_file)
return logger
def load_artifacts(precomputed_dir: str, logger: logging.Logger):
"""Load BM25 scorer, candidate IDs, and LightGBM model.
Tries fast NumPy / native-format artifacts first; falls back to pickle
if the fast artifacts haven't been built yet (backward-compatible).
"""
from retrieval import load_numpy_bm25_artifacts
bm25 = load_numpy_bm25_artifacts(precomputed_dir)
if bm25 is not None:
logger.info("Stage 0: NumpyBM25 loaded (fast path)")
else:
bm25_path = os.path.join(precomputed_dir, "bm25_index.pkl")
if not os.path.isfile(bm25_path):
logger.error("Missing artifact: %s — run precompute.py first", bm25_path)
sys.exit(1)
with open(bm25_path, "rb") as f:
bm25 = pickle.load(f)
logger.info("Stage 0: BM25Okapi loaded (legacy pickle path)")
ids_path = os.path.join(precomputed_dir, "candidate_ids.pkl")
if not os.path.isfile(ids_path):
logger.error("Missing artifact: %s — run precompute.py first", ids_path)
sys.exit(1)
with open(ids_path, "rb") as f:
candidate_ids = pickle.load(f)
lgbm_txt = os.path.join(precomputed_dir, "lgbm_model.txt")
lgbm_pkl = os.path.join(precomputed_dir, "lgbm_model.pkl")
model = None
if os.path.isfile(lgbm_txt):
try:
import lightgbm as lgb
t0 = time.time()
model = lgb.Booster(model_file=lgbm_txt)
logger.info("Stage 0: LightGBM loaded from native text (%.2f s)", time.time() - t0)
except Exception as exc:
logger.warning("lgbm native load failed (%s), falling back to pickle", exc)
if model is None:
if not os.path.isfile(lgbm_pkl):
logger.error("Missing artifact: %s — run precompute.py first", lgbm_pkl)
sys.exit(1)
with open(lgbm_pkl, "rb") as f:
model = pickle.load(f)
logger.info("Stage 0: LightGBM loaded from pickle (legacy path)")
static_path = os.path.join(precomputed_dir, "static_features.pkl")
static_features = None
if os.path.isfile(static_path):
try:
t0 = time.time()
with open(static_path, "rb") as f:
static_features = pickle.load(f)
logger.info("Stage 0: Loaded static features (%d candidates) in %.2fs", len(static_features), time.time() - t0)
except Exception as exc:
logger.warning("static_features.pkl load failed (%s), falling back to live calculation", exc)
else:
logger.warning("static_features.pkl not found — falling back to live calculation")
logger.info(
"Artifacts loaded: BM25 scorer (%s, %d candidates), LightGBM model",
type(bm25).__name__,
len(candidate_ids),
)
return bm25, candidate_ids, model, static_features
def load_stage1_candidates(
candidates_path: str,
stage1_ids: List[str],
logger: logging.Logger,
) -> Tuple[List[dict], int]:
"""
Stream-read candidates.jsonl and return only Stage 1 candidates.
Defensive against malformed records, missing fields, null values.
Returns:
(candidate_list, malformed_count)
"""
stage1_set = set(stage1_ids)
found: Dict[str, dict] = {}
malformed_count = 0
with open(candidates_path, "r", encoding="utf-8") as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
c = json.loads(line)
except json.JSONDecodeError as e:
malformed_count += 1
logger.warning("Malformed JSON at line %d: %s", line_num, e)
continue
cid = c.get("candidate_id")
if cid and cid in stage1_set:
found[cid] = c
if len(found) == len(stage1_set):
break
if malformed_count > 0:
logger.warning("Skipped %d malformed JSONL lines during loading", malformed_count)
missing = stage1_set - set(found.keys())
if missing:
logger.warning(
"%d stage1 candidates not found in JSONL: %s...",
len(missing), list(missing)[:5]
)
ordered = [found[cid] for cid in stage1_ids if cid in found]
logger.info(
"Loaded %d stage1 candidates (%d missing, %d malformed)",
len(ordered), len(missing), malformed_count
)
return ordered, malformed_count
def load_stage1_candidates_fast(
candidates_path: str,
stage1_ids: List[str],
offsets: Dict[str, int],
logger: logging.Logger,
) -> Tuple[List[dict], int]:
"""
Load Stage 1 candidate records using a precomputed byte-offset index.
Instead of streaming all 487 MB of candidates.jsonl, performs one
f.seek() + f.readline() per candidate. For ~8500 candidates this
reads ~43 MB total instead of 487 MB, reducing Stage 2 from ~4 s
to ~0.1–0.3 s.
Returns:
(candidate_list, malformed_count)
"""
ordered: List[dict] = []
malformed_count = 0
missing: List[str] = []
with open(candidates_path, "rb") as f:
for cid in stage1_ids:
offset = offsets.get(cid)
if offset is None:
missing.append(cid)
continue
f.seek(offset)
raw = f.readline()
try:
c = json.loads(raw.decode("utf-8", errors="ignore").strip())
ordered.append(c)
except json.JSONDecodeError as exc:
logger.warning("Malformed record at offset %d for %s: %s", offset, cid, exc)
malformed_count += 1
if missing:
logger.warning(
"%d stage1 candidates not in offset index: %s ...",
len(missing), missing[:5],
)
logger.info(
"Loaded %d stage1 candidates via offset index (%d missing, %d malformed)",
len(ordered), len(missing), malformed_count,
)
return ordered, malformed_count
def extract_features_for_ranking(
candidates: List[dict],
jd_config,
bm25_scores: Dict[str, float],
stage1_bm25_median: float,
logger: logging.Logger,
static_features: Optional[Dict[str, Dict[str, float]]] = None,
) -> Tuple[np.ndarray, List[str], Dict[str, float]]:
"""
Extract the 22-feature matrix for all Stage 1 candidates.
Returns:
(X: np.ndarray[N, 22], ordered_ids: List[str], consistency_map: Dict[str, float])
"""
from features import build_feature_vector, FEATURE_COLUMNS
feature_rows = []
ordered_ids = []
consistency_map = {}
failed_count = 0
for candidate in candidates:
cid = candidate.get("candidate_id", "UNKNOWN")
bm25_score = bm25_scores.get(cid, 0.0)
try:
fv = build_feature_vector(
candidate, jd_config,
bm25_score=bm25_score,
stage1_bm25_median=stage1_bm25_median,
precomputed_static=static_features.get(cid) if static_features else None
)
row = [fv[col] for col in FEATURE_COLUMNS]
consistency_map[cid] = float(fv.get("consistency_score", 1.0))
except Exception as e:
logger.warning("Feature extraction failed for %s: %s", cid, e)
row = [0.0] * len(FEATURE_COLUMNS)
consistency_map[cid] = 1.0
failed_count += 1
feature_rows.append(row)
ordered_ids.append(cid)
if failed_count > 0:
logger.warning("Feature extraction failed for %d candidates (zeroed out)", failed_count)
X = np.array(feature_rows, dtype=np.float32)
logger.info("Feature matrix: shape=%s", X.shape)
return X, ordered_ids, consistency_map
def run_lightgbm_inference(
model,
X: np.ndarray,
ordered_ids: List[str],
logger: logging.Logger,
) -> Dict[str, float]:
"""
Run LightGBM predict on the feature matrix.
Returns:
{candidate_id: lgbm_score}
"""
t0 = time.time()
raw_scores = model.predict(X)
elapsed = time.time() - t0
logger.info(
"LightGBM inference: %d candidates in %.2fs", len(ordered_ids), elapsed
)
return {cid: float(score) for cid, score in zip(ordered_ids, raw_scores)}
def _normalize_scores(top_100_raw: List[Tuple[str, float]], logger: logging.Logger) -> List[Tuple[str, float, int]]:
"""
Apply min-max normalization to raw scores and assign ranks 1..N.
Returns: List of (candidate_id, score, rank) sorted by rank.
"""
if not top_100_raw:
return []
top_scores = [s for _, s in top_100_raw]
score_min = top_scores[-1]
score_max = top_scores[0]
score_range = score_max - score_min
result = []
prev_normalized = None
for rank, (cid, raw_score) in enumerate(top_100_raw, 1):
if score_range > 0:
normalized = 0.01 + 0.99 * (raw_score - score_min) / score_range
else:
normalized = 1.0 - (rank - 1) / 99.0
if prev_normalized is not None and normalized > prev_normalized + 1e-9:
logger.error("MONOTONICITY VIOLATION at rank %d", rank)
prev_normalized = normalized
result.append((cid, normalized, rank))
logger.info("Top 100 selected: score range [%.6f, %.6f]",
result[-1][1], result[0][1])
return result
def sort_and_enforce_monotonicity(
lgbm_scores: Dict[str, float],
logger: logging.Logger,
) -> List[Tuple[str, float, int]]:
"""
Sort candidates by score descending. Break ties by ascending candidate_id.
Assign ranks 1..N.
Returns:
List of (candidate_id, score, rank) sorted by rank.
"""
sorted_candidates = sorted(
lgbm_scores.items(),
key=lambda x: (-x[1], x[0]),
)
top_100_raw = sorted_candidates[:100]
return _normalize_scores(top_100_raw, logger)
def assert_monotonicity(ranked: List[Tuple[str, float, int]]) -> None:
"""
Explicit runtime assertion: scores must be monotonically non-increasing by rank.
This runs BEFORE writing the CSV — not just by sorting and hoping.
Raises AssertionError if violated.
"""
for i in range(1, len(ranked)):
prev_score = ranked[i-1][1]
curr_score = ranked[i][1]
assert curr_score <= prev_score + 1e-9, (
f"Monotonicity violation: rank {i} score {prev_score:.8f} "
f"< rank {i+1} score {curr_score:.8f}"
)
def run_honeypot_audit(
top_100_candidates: List[dict],
feature_vectors: Dict[str, dict],
logger: logging.Logger,
) -> None:
"""
Section 8.1: Pre-Submission Honeypot Audit.
assert count(consistency_score < 0.25 in top_100) < 10.
If this assertion fails, rank.py exits non-zero.
"""
low_consistency_count = sum(
1 for c in top_100_candidates
if feature_vectors.get(c.get("candidate_id", ""), {}).get("consistency_score", 1.0) < 0.25
)
logger.info(
"Honeypot audit: %d of 100 candidates have consistency_score < 0.25",
low_consistency_count
)
if low_consistency_count >= 10:
logger.error(
"HONEYPOT AUDIT FAILED: %d candidates with consistency_score < 0.25 "
"(threshold: < 10). Pipeline is broken — honeypots bypassed filters.",
low_consistency_count
)
sys.exit(2)
logger.info("Honeypot audit PASSED.")
def run_diversity_audit(
top_100_candidates: List[dict],
feature_vectors: Dict[str, dict],
logger: logging.Logger,
) -> None:
"""
Section 8.2: Top 100 Diversity & Homogeneity Audit.
Uses validate_pipeline.check_top100_diversity.
If the check fails, rank.py exits non-zero with a clear error.
This is a BLOCKING check — not just a warning.
"""
from validate_pipeline import check_top100_diversity, print_diversity_report
report = check_top100_diversity(
top_100_candidates,
feature_vectors,
max_signature_share=0.25,
max_single_company_share=0.30,
)
print_diversity_report(report)
logger.info(
"Diversity audit: %d distinct archetypes, max_company=%.1f%%, max_sig=%.1f%%",
report["n_distinct_signatures"],
report["most_common_company_share"] * 100,
report["most_common_signature_share"] * 100,
)
if not report["pass"]:
if report["flagged_companies"]:
logger.error(
"DIVERSITY AUDIT FAILED: company concentration too high: %s",
report["flagged_companies"]
)
if report["flagged_signatures"]:
logger.error(
"DIVERSITY AUDIT FAILED: archetype signature concentration too high: %s",
report["flagged_signatures"]
)
sys.exit(3)
logger.info("Diversity audit PASSED.")
def write_reasoning_trace(
top_30_traces: List[dict],
base_dir: str,
logger: logging.Logger,
) -> None:
"""Write reasoning_trace.jsonl for top 30 candidates."""
trace_path = os.path.join(base_dir, "reasoning_trace.jsonl")
with open(trace_path, "w", encoding="utf-8") as f:
for trace in top_30_traces:
f.write(json.dumps(trace, ensure_ascii=False) + "\n")
logger.info("Reasoning trace written: %s (%d entries)", trace_path, len(top_30_traces))
def pipeline_fn(
candidates: List[dict],
jd_config,
disable_consistency: bool = False,
disable_param_a: bool = False,
disable_features: bool = False,
) -> List[str]:
"""
Pipeline function compatible with validate_pipeline.run_ablation.
Accepts a list of candidate dicts + jd_config, returns ranked candidate_ids.
This runs the full in-memory pipeline (for small candidate sets).
"""
from features import build_feature_vector, FEATURE_COLUMNS, consistency_score
from precompute import tokenize_candidate
corpus = [tokenize_candidate(c) for c in candidates]
bm25 = BM25Okapi(corpus)
cids = [c.get("candidate_id", f"IDX_{i}") for i, c in enumerate(candidates)]
from retrieval import tokenize_query
query_tokens = tokenize_query(jd_config.get_all_query_terms() + jd_config.production_keywords)
raw_scores = bm25.get_scores(query_tokens)
bm25_scores = {cids[i]: float(raw_scores[i]) for i in range(len(cids))}
median_bm25 = float(np.median(list(bm25_scores.values())))
feature_rows = []
for c in candidates:
cid = c.get("candidate_id", "")
bs = bm25_scores.get(cid, 0.0)
if disable_features:
row = [bs] + [0.0] * 21
else:
try:
fv = build_feature_vector(c, jd_config, bs, median_bm25)
if disable_consistency:
fv["consistency_score"] = 1.0
if disable_param_a:
fv["Param_A_Systems_Depth"] = 0.0
row = [fv[col] for col in FEATURE_COLUMNS]
except Exception:
row = [bs] + [0.0] * 21
feature_rows.append(row)
try:
import pickle
base = _PROJECT_ROOT
with open(os.path.join(base, "precomputed", "lgbm_model.pkl"), "rb") as f:
model = pickle.load(f)
X = np.array(feature_rows, dtype=np.float32)
scores = model.predict(X)
except Exception:
scores = np.array([bm25_scores.get(cid, 0.0) for cid in cids])
ranked = sorted(
zip(cids, scores.tolist()),
key=lambda x: (-x[1], x[0])
)
return [cid for cid, _ in ranked]
def main() -> None:
parser = argparse.ArgumentParser(
description="Redrob Candidate Ranking Pipeline",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--candidates",
required=True,
help="Path to candidates.jsonl",
)
parser.add_argument(
"--out",
default="./CTRL_COFFEE_REPEAT.csv",
help="Path for output CTRL_COFFEE_REPEAT.csv",
)
parser.add_argument(
"--base-dir",
default=None,
help="Base directory (defaults to directory containing rank.py)",
)
args = parser.parse_args()
script_dir = _PROJECT_ROOT
base_dir = os.path.abspath(args.base_dir) if args.base_dir else script_dir
candidates_path = os.path.abspath(args.candidates)
out_path = os.path.abspath(args.out)
precomputed_dir = os.path.join(base_dir, "precomputed")
logger = setup_logging(base_dir)
wall_start = time.time()
logger.info("=" * 60)
logger.info("REDROB RANKING PIPELINE")
logger.info("Candidates: %s", candidates_path)
logger.info("Output: %s", out_path)
logger.info("Base dir: %s", base_dir)
logger.info("=" * 60)
t0 = time.time()
bm25, candidate_ids, model, static_features = load_artifacts(precomputed_dir, logger)
logger.info("Stage 0 (load artifacts): %.2fs", time.time() - t0)
t1 = time.time()
from jd_parser import parse_jd
from retrieval import run_dual_pass_retrieval
jd_config = parse_jd(os.path.join(base_dir, "data", "skill_aliases.json"))
stage1_ids, bm25_scores = run_dual_pass_retrieval(bm25, candidate_ids, jd_config)
stage1_bm25_scores_list = list(bm25_scores.values())
stage1_bm25_median = float(np.median(stage1_bm25_scores_list))
logger.info(
"Stage 1 (retrieval): %d candidates retrieved, median BM25=%.4f in %.2fs",
len(stage1_ids), stage1_bm25_median, time.time() - t1
)
t2 = time.time()
offsets_path = os.path.join(precomputed_dir, "candidate_offsets.pkl")
if os.path.isfile(offsets_path):
with open(offsets_path, "rb") as f:
candidate_offsets = pickle.load(f)
stage1_candidates, malformed_count = load_stage1_candidates_fast(
candidates_path, stage1_ids, candidate_offsets, logger
)
else:
logger.info("Stage 2: offset index not found — streaming full JSONL (slow)")
stage1_candidates, malformed_count = load_stage1_candidates(
candidates_path, stage1_ids, logger
)
logger.info(
"Stage 2 (load records): %d candidates loaded (%d malformed) in %.2f s",
len(stage1_candidates), malformed_count, time.time() - t2
)
t2b = time.time()
X, ordered_ids, consistency_map = extract_features_for_ranking(
stage1_candidates, jd_config, bm25_scores, stage1_bm25_median, logger,
static_features=static_features
)
logger.info("Stage 2b (features): %.2fs", time.time() - t2b)
t4 = time.time()
lgbm_scores = run_lightgbm_inference(model, X, ordered_ids, logger)
for cid in lgbm_scores:
lgbm_scores[cid] *= consistency_map.get(cid, 1.0)
logger.info("Stage 4 (LightGBM + multiplier): %.2fs", time.time() - t4)
t5 = time.time()
ranked_top100 = sort_and_enforce_monotonicity(lgbm_scores, logger)
assert_monotonicity(ranked_top100)
logger.info("Monotonicity assertion PASSED.")
assert len(ranked_top100) == 100, (
f"Expected exactly 100 candidates, got {len(ranked_top100)}"
)
logger.info("Count assertion PASSED: exactly 100 candidates.")
top100_ids = [cid for cid, _, _ in ranked_top100]
from features import build_feature_vector, FEATURE_COLUMNS
candidate_lookup: Dict[str, dict] = {
c.get("candidate_id"): c for c in stage1_candidates
}
feature_vectors: Dict[str, dict] = {}
for cid in top100_ids:
c = candidate_lookup.get(cid)
if c is None:
feature_vectors[cid] = {col: 0.0 for col in FEATURE_COLUMNS}
continue
bs = bm25_scores.get(cid, 0.0)
try:
feature_vectors[cid] = build_feature_vector(
c, jd_config, bs, stage1_bm25_median,
precomputed_static=static_features.get(cid) if static_features else None
)
except Exception:
feature_vectors[cid] = {col: 0.0 for col in FEATURE_COLUMNS}
top100_candidates = [candidate_lookup[cid] for cid in top100_ids if cid in candidate_lookup]
run_honeypot_audit(top100_candidates, feature_vectors, logger)
run_diversity_audit(top100_candidates, feature_vectors, logger)
t5r = time.time()
from reasoning import ReasoningCompiler
all_lgbm_scores = [lgbm_scores[cid] for cid in top100_ids if cid in lgbm_scores]
compiler = ReasoningCompiler(jd_config, all_scores=all_lgbm_scores)
reasoning_texts: Dict[str, str] = {}
reasoning_traces: List[dict] = []
for cid, norm_score, rank in ranked_top100:
c = candidate_lookup.get(cid, {"candidate_id": cid})
fv = feature_vectors.get(cid, {col: 0.0 for col in FEATURE_COLUMNS})
raw_lgbm = lgbm_scores.get(cid, 0.0)
if rank <= 30:
trace = compiler.compile_trace(c, fv, raw_lgbm, rank)
reasoning_traces.append(trace)
reasoning_texts[cid] = trace["reasoning"]
else:
reasoning_texts[cid] = compiler.compile(c, fv, raw_lgbm, rank)
logger.info("Stage 5 (reasoning): %.2fs", time.time() - t5r)
write_reasoning_trace(reasoning_traces, base_dir, logger)
rows = []
for cid, norm_score, rank in ranked_top100:
rows.append({
"candidate_id": cid,
"rank": rank,
"score": round(norm_score, 6),
"reasoning": reasoning_texts.get(cid, ""),
})
df = pd.DataFrame(rows, columns=["candidate_id", "rank", "score", "reasoning"])
assert len(df) == 100, f"DataFrame has {len(df)} rows, expected 100"
assert list(df.columns) == ["candidate_id", "rank", "score", "reasoning"], \
f"Unexpected columns: {list(df.columns)}"
scores_arr = df["score"].values
for i in range(1, len(scores_arr)):
assert scores_arr[i] <= scores_arr[i-1] + 1e-9, (
f"DataFrame monotonicity violation at row {i}: "
f"{scores_arr[i-1]:.8f} -> {scores_arr[i]:.8f}"
)
logger.info("Final DataFrame monotonicity assertion PASSED.")
df.to_csv(out_path, index=False, encoding="utf-8")
logger.info("Submission CSV written: %s", out_path)
wall_elapsed = time.time() - wall_start
logger.info("=" * 60)
logger.info("PIPELINE COMPLETE")
logger.info("Wall-clock time: %.2fs (limit: 300s)", wall_elapsed)
logger.info("Output: %s", out_path)
logger.info("Candidates ranked: 100")
logger.info("=" * 60)
if wall_elapsed > 300:
logger.error(
"TIMING VIOLATION: Pipeline took %.1fs > 300s limit", wall_elapsed
)
sys.exit(4)
print("\n--- submission.csv (first 5 rows) ---")
print(df.head(5).to_string(index=False))
print(f"\nTotal rows: {len(df)}")
print(f"Score range: [{df['score'].min():.6f}, {df['score'].max():.6f}]")
print(f"Wall-clock: {wall_elapsed:.1f}s")
if __name__ == "__main__":
main()