| from __future__ import annotations | |
| import argparse | |
| import json | |
| import logging | |
| import math | |
| import os | |
| import pickle | |
| import sys | |
| import time | |
| from datetime import datetime | |
| from typing import Dict, List, Optional, Tuple | |
| _SRC_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| _PROJECT_ROOT = os.path.dirname(_SRC_DIR) | |
| _SCRIPTS_DIR = os.path.join(_PROJECT_ROOT, "scripts") | |
| for _p in [_SRC_DIR, _SCRIPTS_DIR, _PROJECT_ROOT]: | |
| if _p not in sys.path: | |
| sys.path.insert(0, _p) | |
| import numpy as np | |
| import pandas as pd | |
| from rank_bm25 import BM25Okapi | |
| def setup_logging(base_dir: str) -> logging.Logger: | |
| """Set up file + console logging.""" | |
| logs_dir = os.path.join(base_dir, "logs") | |
| os.makedirs(logs_dir, exist_ok=True) | |
| timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") | |
| log_file = os.path.join(logs_dir, f"rank_{timestamp}.log") | |
| logger = logging.getLogger("rank") | |
| logger.setLevel(logging.DEBUG) | |
| fh = logging.FileHandler(log_file, encoding="utf-8") | |
| fh.setLevel(logging.DEBUG) | |
| fh.setFormatter(logging.Formatter( | |
| "%(asctime)s %(levelname)s [%(name)s] %(message)s", | |
| datefmt="%H:%M:%S" | |
| )) | |
| ch = logging.StreamHandler(sys.stdout) | |
| ch.setLevel(logging.INFO) | |
| ch.setFormatter(logging.Formatter( | |
| "%(asctime)s %(levelname)s %(message)s", | |
| datefmt="%H:%M:%S" | |
| )) | |
| logger.addHandler(fh) | |
| logger.addHandler(ch) | |
| logger.info("Log file: %s", log_file) | |
| return logger | |
| def load_artifacts(precomputed_dir: str, logger: logging.Logger): | |
| """Load BM25 scorer, candidate IDs, and LightGBM model. | |
| Tries fast NumPy / native-format artifacts first; falls back to pickle | |
| if the fast artifacts haven't been built yet (backward-compatible). | |
| """ | |
| from retrieval import load_numpy_bm25_artifacts | |
| bm25 = load_numpy_bm25_artifacts(precomputed_dir) | |
| if bm25 is not None: | |
| logger.info("Stage 0: NumpyBM25 loaded (fast path)") | |
| else: | |
| bm25_path = os.path.join(precomputed_dir, "bm25_index.pkl") | |
| if not os.path.isfile(bm25_path): | |
| logger.error("Missing artifact: %s — run precompute.py first", bm25_path) | |
| sys.exit(1) | |
| with open(bm25_path, "rb") as f: | |
| bm25 = pickle.load(f) | |
| logger.info("Stage 0: BM25Okapi loaded (legacy pickle path)") | |
| ids_path = os.path.join(precomputed_dir, "candidate_ids.pkl") | |
| if not os.path.isfile(ids_path): | |
| logger.error("Missing artifact: %s — run precompute.py first", ids_path) | |
| sys.exit(1) | |
| with open(ids_path, "rb") as f: | |
| candidate_ids = pickle.load(f) | |
| lgbm_txt = os.path.join(precomputed_dir, "lgbm_model.txt") | |
| lgbm_pkl = os.path.join(precomputed_dir, "lgbm_model.pkl") | |
| model = None | |
| if os.path.isfile(lgbm_txt): | |
| try: | |
| import lightgbm as lgb | |
| t0 = time.time() | |
| model = lgb.Booster(model_file=lgbm_txt) | |
| logger.info("Stage 0: LightGBM loaded from native text (%.2f s)", time.time() - t0) | |
| except Exception as exc: | |
| logger.warning("lgbm native load failed (%s), falling back to pickle", exc) | |
| if model is None: | |
| if not os.path.isfile(lgbm_pkl): | |
| logger.error("Missing artifact: %s — run precompute.py first", lgbm_pkl) | |
| sys.exit(1) | |
| with open(lgbm_pkl, "rb") as f: | |
| model = pickle.load(f) | |
| logger.info("Stage 0: LightGBM loaded from pickle (legacy path)") | |
| static_path = os.path.join(precomputed_dir, "static_features.pkl") | |
| static_features = None | |
| if os.path.isfile(static_path): | |
| try: | |
| t0 = time.time() | |
| with open(static_path, "rb") as f: | |
| static_features = pickle.load(f) | |
| logger.info("Stage 0: Loaded static features (%d candidates) in %.2fs", len(static_features), time.time() - t0) | |
| except Exception as exc: | |
| logger.warning("static_features.pkl load failed (%s), falling back to live calculation", exc) | |
| else: | |
| logger.warning("static_features.pkl not found — falling back to live calculation") | |
| logger.info( | |
| "Artifacts loaded: BM25 scorer (%s, %d candidates), LightGBM model", | |
| type(bm25).__name__, | |
| len(candidate_ids), | |
| ) | |
| return bm25, candidate_ids, model, static_features | |
| def load_stage1_candidates( | |
| candidates_path: str, | |
| stage1_ids: List[str], | |
| logger: logging.Logger, | |
| ) -> Tuple[List[dict], int]: | |
| """ | |
| Stream-read candidates.jsonl and return only Stage 1 candidates. | |
| Defensive against malformed records, missing fields, null values. | |
| Returns: | |
| (candidate_list, malformed_count) | |
| """ | |
| stage1_set = set(stage1_ids) | |
| found: Dict[str, dict] = {} | |
| malformed_count = 0 | |
| with open(candidates_path, "r", encoding="utf-8") as f: | |
| for line_num, line in enumerate(f, 1): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| c = json.loads(line) | |
| except json.JSONDecodeError as e: | |
| malformed_count += 1 | |
| logger.warning("Malformed JSON at line %d: %s", line_num, e) | |
| continue | |
| cid = c.get("candidate_id") | |
| if cid and cid in stage1_set: | |
| found[cid] = c | |
| if len(found) == len(stage1_set): | |
| break | |
| if malformed_count > 0: | |
| logger.warning("Skipped %d malformed JSONL lines during loading", malformed_count) | |
| missing = stage1_set - set(found.keys()) | |
| if missing: | |
| logger.warning( | |
| "%d stage1 candidates not found in JSONL: %s...", | |
| len(missing), list(missing)[:5] | |
| ) | |
| ordered = [found[cid] for cid in stage1_ids if cid in found] | |
| logger.info( | |
| "Loaded %d stage1 candidates (%d missing, %d malformed)", | |
| len(ordered), len(missing), malformed_count | |
| ) | |
| return ordered, malformed_count | |
| def load_stage1_candidates_fast( | |
| candidates_path: str, | |
| stage1_ids: List[str], | |
| offsets: Dict[str, int], | |
| logger: logging.Logger, | |
| ) -> Tuple[List[dict], int]: | |
| """ | |
| Load Stage 1 candidate records using a precomputed byte-offset index. | |
| Instead of streaming all 487 MB of candidates.jsonl, performs one | |
| f.seek() + f.readline() per candidate. For ~8500 candidates this | |
| reads ~43 MB total instead of 487 MB, reducing Stage 2 from ~4 s | |
| to ~0.1–0.3 s. | |
| Returns: | |
| (candidate_list, malformed_count) | |
| """ | |
| ordered: List[dict] = [] | |
| malformed_count = 0 | |
| missing: List[str] = [] | |
| with open(candidates_path, "rb") as f: | |
| for cid in stage1_ids: | |
| offset = offsets.get(cid) | |
| if offset is None: | |
| missing.append(cid) | |
| continue | |
| f.seek(offset) | |
| raw = f.readline() | |
| try: | |
| c = json.loads(raw.decode("utf-8", errors="ignore").strip()) | |
| ordered.append(c) | |
| except json.JSONDecodeError as exc: | |
| logger.warning("Malformed record at offset %d for %s: %s", offset, cid, exc) | |
| malformed_count += 1 | |
| if missing: | |
| logger.warning( | |
| "%d stage1 candidates not in offset index: %s ...", | |
| len(missing), missing[:5], | |
| ) | |
| logger.info( | |
| "Loaded %d stage1 candidates via offset index (%d missing, %d malformed)", | |
| len(ordered), len(missing), malformed_count, | |
| ) | |
| return ordered, malformed_count | |
| def extract_features_for_ranking( | |
| candidates: List[dict], | |
| jd_config, | |
| bm25_scores: Dict[str, float], | |
| stage1_bm25_median: float, | |
| logger: logging.Logger, | |
| static_features: Optional[Dict[str, Dict[str, float]]] = None, | |
| ) -> Tuple[np.ndarray, List[str], Dict[str, float]]: | |
| """ | |
| Extract the 22-feature matrix for all Stage 1 candidates. | |
| Returns: | |
| (X: np.ndarray[N, 22], ordered_ids: List[str], consistency_map: Dict[str, float]) | |
| """ | |
| from features import build_feature_vector, FEATURE_COLUMNS | |
| feature_rows = [] | |
| ordered_ids = [] | |
| consistency_map = {} | |
| failed_count = 0 | |
| for candidate in candidates: | |
| cid = candidate.get("candidate_id", "UNKNOWN") | |
| bm25_score = bm25_scores.get(cid, 0.0) | |
| try: | |
| fv = build_feature_vector( | |
| candidate, jd_config, | |
| bm25_score=bm25_score, | |
| stage1_bm25_median=stage1_bm25_median, | |
| precomputed_static=static_features.get(cid) if static_features else None | |
| ) | |
| row = [fv[col] for col in FEATURE_COLUMNS] | |
| consistency_map[cid] = float(fv.get("consistency_score", 1.0)) | |
| except Exception as e: | |
| logger.warning("Feature extraction failed for %s: %s", cid, e) | |
| row = [0.0] * len(FEATURE_COLUMNS) | |
| consistency_map[cid] = 1.0 | |
| failed_count += 1 | |
| feature_rows.append(row) | |
| ordered_ids.append(cid) | |
| if failed_count > 0: | |
| logger.warning("Feature extraction failed for %d candidates (zeroed out)", failed_count) | |
| X = np.array(feature_rows, dtype=np.float32) | |
| logger.info("Feature matrix: shape=%s", X.shape) | |
| return X, ordered_ids, consistency_map | |
| def run_lightgbm_inference( | |
| model, | |
| X: np.ndarray, | |
| ordered_ids: List[str], | |
| logger: logging.Logger, | |
| ) -> Dict[str, float]: | |
| """ | |
| Run LightGBM predict on the feature matrix. | |
| Returns: | |
| {candidate_id: lgbm_score} | |
| """ | |
| t0 = time.time() | |
| raw_scores = model.predict(X) | |
| elapsed = time.time() - t0 | |
| logger.info( | |
| "LightGBM inference: %d candidates in %.2fs", len(ordered_ids), elapsed | |
| ) | |
| return {cid: float(score) for cid, score in zip(ordered_ids, raw_scores)} | |
| def _normalize_scores(top_100_raw: List[Tuple[str, float]], logger: logging.Logger) -> List[Tuple[str, float, int]]: | |
| """ | |
| Apply min-max normalization to raw scores and assign ranks 1..N. | |
| Returns: List of (candidate_id, score, rank) sorted by rank. | |
| """ | |
| if not top_100_raw: | |
| return [] | |
| top_scores = [s for _, s in top_100_raw] | |
| score_min = top_scores[-1] | |
| score_max = top_scores[0] | |
| score_range = score_max - score_min | |
| result = [] | |
| prev_normalized = None | |
| for rank, (cid, raw_score) in enumerate(top_100_raw, 1): | |
| if score_range > 0: | |
| normalized = 0.01 + 0.99 * (raw_score - score_min) / score_range | |
| else: | |
| normalized = 1.0 - (rank - 1) / 99.0 | |
| if prev_normalized is not None and normalized > prev_normalized + 1e-9: | |
| logger.error("MONOTONICITY VIOLATION at rank %d", rank) | |
| prev_normalized = normalized | |
| result.append((cid, normalized, rank)) | |
| logger.info("Top 100 selected: score range [%.6f, %.6f]", | |
| result[-1][1], result[0][1]) | |
| return result | |
| def sort_and_enforce_monotonicity( | |
| lgbm_scores: Dict[str, float], | |
| logger: logging.Logger, | |
| ) -> List[Tuple[str, float, int]]: | |
| """ | |
| Sort candidates by score descending. Break ties by ascending candidate_id. | |
| Assign ranks 1..N. | |
| Returns: | |
| List of (candidate_id, score, rank) sorted by rank. | |
| """ | |
| sorted_candidates = sorted( | |
| lgbm_scores.items(), | |
| key=lambda x: (-x[1], x[0]), | |
| ) | |
| top_100_raw = sorted_candidates[:100] | |
| return _normalize_scores(top_100_raw, logger) | |
| def assert_monotonicity(ranked: List[Tuple[str, float, int]]) -> None: | |
| """ | |
| Explicit runtime assertion: scores must be monotonically non-increasing by rank. | |
| This runs BEFORE writing the CSV — not just by sorting and hoping. | |
| Raises AssertionError if violated. | |
| """ | |
| for i in range(1, len(ranked)): | |
| prev_score = ranked[i-1][1] | |
| curr_score = ranked[i][1] | |
| assert curr_score <= prev_score + 1e-9, ( | |
| f"Monotonicity violation: rank {i} score {prev_score:.8f} " | |
| f"< rank {i+1} score {curr_score:.8f}" | |
| ) | |
| def run_honeypot_audit( | |
| top_100_candidates: List[dict], | |
| feature_vectors: Dict[str, dict], | |
| logger: logging.Logger, | |
| ) -> None: | |
| """ | |
| Section 8.1: Pre-Submission Honeypot Audit. | |
| assert count(consistency_score < 0.25 in top_100) < 10. | |
| If this assertion fails, rank.py exits non-zero. | |
| """ | |
| low_consistency_count = sum( | |
| 1 for c in top_100_candidates | |
| if feature_vectors.get(c.get("candidate_id", ""), {}).get("consistency_score", 1.0) < 0.25 | |
| ) | |
| logger.info( | |
| "Honeypot audit: %d of 100 candidates have consistency_score < 0.25", | |
| low_consistency_count | |
| ) | |
| if low_consistency_count >= 10: | |
| logger.error( | |
| "HONEYPOT AUDIT FAILED: %d candidates with consistency_score < 0.25 " | |
| "(threshold: < 10). Pipeline is broken — honeypots bypassed filters.", | |
| low_consistency_count | |
| ) | |
| sys.exit(2) | |
| logger.info("Honeypot audit PASSED.") | |
| def run_diversity_audit( | |
| top_100_candidates: List[dict], | |
| feature_vectors: Dict[str, dict], | |
| logger: logging.Logger, | |
| ) -> None: | |
| """ | |
| Section 8.2: Top 100 Diversity & Homogeneity Audit. | |
| Uses validate_pipeline.check_top100_diversity. | |
| If the check fails, rank.py exits non-zero with a clear error. | |
| This is a BLOCKING check — not just a warning. | |
| """ | |
| from validate_pipeline import check_top100_diversity, print_diversity_report | |
| report = check_top100_diversity( | |
| top_100_candidates, | |
| feature_vectors, | |
| max_signature_share=0.25, | |
| max_single_company_share=0.30, | |
| ) | |
| print_diversity_report(report) | |
| logger.info( | |
| "Diversity audit: %d distinct archetypes, max_company=%.1f%%, max_sig=%.1f%%", | |
| report["n_distinct_signatures"], | |
| report["most_common_company_share"] * 100, | |
| report["most_common_signature_share"] * 100, | |
| ) | |
| if not report["pass"]: | |
| if report["flagged_companies"]: | |
| logger.error( | |
| "DIVERSITY AUDIT FAILED: company concentration too high: %s", | |
| report["flagged_companies"] | |
| ) | |
| if report["flagged_signatures"]: | |
| logger.error( | |
| "DIVERSITY AUDIT FAILED: archetype signature concentration too high: %s", | |
| report["flagged_signatures"] | |
| ) | |
| sys.exit(3) | |
| logger.info("Diversity audit PASSED.") | |
| def write_reasoning_trace( | |
| top_30_traces: List[dict], | |
| base_dir: str, | |
| logger: logging.Logger, | |
| ) -> None: | |
| """Write reasoning_trace.jsonl for top 30 candidates.""" | |
| trace_path = os.path.join(base_dir, "reasoning_trace.jsonl") | |
| with open(trace_path, "w", encoding="utf-8") as f: | |
| for trace in top_30_traces: | |
| f.write(json.dumps(trace, ensure_ascii=False) + "\n") | |
| logger.info("Reasoning trace written: %s (%d entries)", trace_path, len(top_30_traces)) | |
| def pipeline_fn( | |
| candidates: List[dict], | |
| jd_config, | |
| disable_consistency: bool = False, | |
| disable_param_a: bool = False, | |
| disable_features: bool = False, | |
| ) -> List[str]: | |
| """ | |
| Pipeline function compatible with validate_pipeline.run_ablation. | |
| Accepts a list of candidate dicts + jd_config, returns ranked candidate_ids. | |
| This runs the full in-memory pipeline (for small candidate sets). | |
| """ | |
| from features import build_feature_vector, FEATURE_COLUMNS, consistency_score | |
| from precompute import tokenize_candidate | |
| corpus = [tokenize_candidate(c) for c in candidates] | |
| bm25 = BM25Okapi(corpus) | |
| cids = [c.get("candidate_id", f"IDX_{i}") for i, c in enumerate(candidates)] | |
| from retrieval import tokenize_query | |
| query_tokens = tokenize_query(jd_config.get_all_query_terms() + jd_config.production_keywords) | |
| raw_scores = bm25.get_scores(query_tokens) | |
| bm25_scores = {cids[i]: float(raw_scores[i]) for i in range(len(cids))} | |
| median_bm25 = float(np.median(list(bm25_scores.values()))) | |
| feature_rows = [] | |
| for c in candidates: | |
| cid = c.get("candidate_id", "") | |
| bs = bm25_scores.get(cid, 0.0) | |
| if disable_features: | |
| row = [bs] + [0.0] * 21 | |
| else: | |
| try: | |
| fv = build_feature_vector(c, jd_config, bs, median_bm25) | |
| if disable_consistency: | |
| fv["consistency_score"] = 1.0 | |
| if disable_param_a: | |
| fv["Param_A_Systems_Depth"] = 0.0 | |
| row = [fv[col] for col in FEATURE_COLUMNS] | |
| except Exception: | |
| row = [bs] + [0.0] * 21 | |
| feature_rows.append(row) | |
| try: | |
| import pickle | |
| base = _PROJECT_ROOT | |
| with open(os.path.join(base, "precomputed", "lgbm_model.pkl"), "rb") as f: | |
| model = pickle.load(f) | |
| X = np.array(feature_rows, dtype=np.float32) | |
| scores = model.predict(X) | |
| except Exception: | |
| scores = np.array([bm25_scores.get(cid, 0.0) for cid in cids]) | |
| ranked = sorted( | |
| zip(cids, scores.tolist()), | |
| key=lambda x: (-x[1], x[0]) | |
| ) | |
| return [cid for cid, _ in ranked] | |
| def main() -> None: | |
| parser = argparse.ArgumentParser( | |
| description="Redrob Candidate Ranking Pipeline", | |
| formatter_class=argparse.ArgumentDefaultsHelpFormatter, | |
| ) | |
| parser.add_argument( | |
| "--candidates", | |
| required=True, | |
| help="Path to candidates.jsonl", | |
| ) | |
| parser.add_argument( | |
| "--out", | |
| default="./CTRL_COFFEE_REPEAT.csv", | |
| help="Path for output CTRL_COFFEE_REPEAT.csv", | |
| ) | |
| parser.add_argument( | |
| "--base-dir", | |
| default=None, | |
| help="Base directory (defaults to directory containing rank.py)", | |
| ) | |
| args = parser.parse_args() | |
| script_dir = _PROJECT_ROOT | |
| base_dir = os.path.abspath(args.base_dir) if args.base_dir else script_dir | |
| candidates_path = os.path.abspath(args.candidates) | |
| out_path = os.path.abspath(args.out) | |
| precomputed_dir = os.path.join(base_dir, "precomputed") | |
| logger = setup_logging(base_dir) | |
| wall_start = time.time() | |
| logger.info("=" * 60) | |
| logger.info("REDROB RANKING PIPELINE") | |
| logger.info("Candidates: %s", candidates_path) | |
| logger.info("Output: %s", out_path) | |
| logger.info("Base dir: %s", base_dir) | |
| logger.info("=" * 60) | |
| t0 = time.time() | |
| bm25, candidate_ids, model, static_features = load_artifacts(precomputed_dir, logger) | |
| logger.info("Stage 0 (load artifacts): %.2fs", time.time() - t0) | |
| t1 = time.time() | |
| from jd_parser import parse_jd | |
| from retrieval import run_dual_pass_retrieval | |
| jd_config = parse_jd(os.path.join(base_dir, "data", "skill_aliases.json")) | |
| stage1_ids, bm25_scores = run_dual_pass_retrieval(bm25, candidate_ids, jd_config) | |
| stage1_bm25_scores_list = list(bm25_scores.values()) | |
| stage1_bm25_median = float(np.median(stage1_bm25_scores_list)) | |
| logger.info( | |
| "Stage 1 (retrieval): %d candidates retrieved, median BM25=%.4f in %.2fs", | |
| len(stage1_ids), stage1_bm25_median, time.time() - t1 | |
| ) | |
| t2 = time.time() | |
| offsets_path = os.path.join(precomputed_dir, "candidate_offsets.pkl") | |
| if os.path.isfile(offsets_path): | |
| with open(offsets_path, "rb") as f: | |
| candidate_offsets = pickle.load(f) | |
| stage1_candidates, malformed_count = load_stage1_candidates_fast( | |
| candidates_path, stage1_ids, candidate_offsets, logger | |
| ) | |
| else: | |
| logger.info("Stage 2: offset index not found — streaming full JSONL (slow)") | |
| stage1_candidates, malformed_count = load_stage1_candidates( | |
| candidates_path, stage1_ids, logger | |
| ) | |
| logger.info( | |
| "Stage 2 (load records): %d candidates loaded (%d malformed) in %.2f s", | |
| len(stage1_candidates), malformed_count, time.time() - t2 | |
| ) | |
| t2b = time.time() | |
| X, ordered_ids, consistency_map = extract_features_for_ranking( | |
| stage1_candidates, jd_config, bm25_scores, stage1_bm25_median, logger, | |
| static_features=static_features | |
| ) | |
| logger.info("Stage 2b (features): %.2fs", time.time() - t2b) | |
| t4 = time.time() | |
| lgbm_scores = run_lightgbm_inference(model, X, ordered_ids, logger) | |
| for cid in lgbm_scores: | |
| lgbm_scores[cid] *= consistency_map.get(cid, 1.0) | |
| logger.info("Stage 4 (LightGBM + multiplier): %.2fs", time.time() - t4) | |
| t5 = time.time() | |
| ranked_top100 = sort_and_enforce_monotonicity(lgbm_scores, logger) | |
| assert_monotonicity(ranked_top100) | |
| logger.info("Monotonicity assertion PASSED.") | |
| assert len(ranked_top100) == 100, ( | |
| f"Expected exactly 100 candidates, got {len(ranked_top100)}" | |
| ) | |
| logger.info("Count assertion PASSED: exactly 100 candidates.") | |
| top100_ids = [cid for cid, _, _ in ranked_top100] | |
| from features import build_feature_vector, FEATURE_COLUMNS | |
| candidate_lookup: Dict[str, dict] = { | |
| c.get("candidate_id"): c for c in stage1_candidates | |
| } | |
| feature_vectors: Dict[str, dict] = {} | |
| for cid in top100_ids: | |
| c = candidate_lookup.get(cid) | |
| if c is None: | |
| feature_vectors[cid] = {col: 0.0 for col in FEATURE_COLUMNS} | |
| continue | |
| bs = bm25_scores.get(cid, 0.0) | |
| try: | |
| feature_vectors[cid] = build_feature_vector( | |
| c, jd_config, bs, stage1_bm25_median, | |
| precomputed_static=static_features.get(cid) if static_features else None | |
| ) | |
| except Exception: | |
| feature_vectors[cid] = {col: 0.0 for col in FEATURE_COLUMNS} | |
| top100_candidates = [candidate_lookup[cid] for cid in top100_ids if cid in candidate_lookup] | |
| run_honeypot_audit(top100_candidates, feature_vectors, logger) | |
| run_diversity_audit(top100_candidates, feature_vectors, logger) | |
| t5r = time.time() | |
| from reasoning import ReasoningCompiler | |
| all_lgbm_scores = [lgbm_scores[cid] for cid in top100_ids if cid in lgbm_scores] | |
| compiler = ReasoningCompiler(jd_config, all_scores=all_lgbm_scores) | |
| reasoning_texts: Dict[str, str] = {} | |
| reasoning_traces: List[dict] = [] | |
| for cid, norm_score, rank in ranked_top100: | |
| c = candidate_lookup.get(cid, {"candidate_id": cid}) | |
| fv = feature_vectors.get(cid, {col: 0.0 for col in FEATURE_COLUMNS}) | |
| raw_lgbm = lgbm_scores.get(cid, 0.0) | |
| if rank <= 30: | |
| trace = compiler.compile_trace(c, fv, raw_lgbm, rank) | |
| reasoning_traces.append(trace) | |
| reasoning_texts[cid] = trace["reasoning"] | |
| else: | |
| reasoning_texts[cid] = compiler.compile(c, fv, raw_lgbm, rank) | |
| logger.info("Stage 5 (reasoning): %.2fs", time.time() - t5r) | |
| write_reasoning_trace(reasoning_traces, base_dir, logger) | |
| rows = [] | |
| for cid, norm_score, rank in ranked_top100: | |
| rows.append({ | |
| "candidate_id": cid, | |
| "rank": rank, | |
| "score": round(norm_score, 6), | |
| "reasoning": reasoning_texts.get(cid, ""), | |
| }) | |
| df = pd.DataFrame(rows, columns=["candidate_id", "rank", "score", "reasoning"]) | |
| assert len(df) == 100, f"DataFrame has {len(df)} rows, expected 100" | |
| assert list(df.columns) == ["candidate_id", "rank", "score", "reasoning"], \ | |
| f"Unexpected columns: {list(df.columns)}" | |
| scores_arr = df["score"].values | |
| for i in range(1, len(scores_arr)): | |
| assert scores_arr[i] <= scores_arr[i-1] + 1e-9, ( | |
| f"DataFrame monotonicity violation at row {i}: " | |
| f"{scores_arr[i-1]:.8f} -> {scores_arr[i]:.8f}" | |
| ) | |
| logger.info("Final DataFrame monotonicity assertion PASSED.") | |
| df.to_csv(out_path, index=False, encoding="utf-8") | |
| logger.info("Submission CSV written: %s", out_path) | |
| wall_elapsed = time.time() - wall_start | |
| logger.info("=" * 60) | |
| logger.info("PIPELINE COMPLETE") | |
| logger.info("Wall-clock time: %.2fs (limit: 300s)", wall_elapsed) | |
| logger.info("Output: %s", out_path) | |
| logger.info("Candidates ranked: 100") | |
| logger.info("=" * 60) | |
| if wall_elapsed > 300: | |
| logger.error( | |
| "TIMING VIOLATION: Pipeline took %.1fs > 300s limit", wall_elapsed | |
| ) | |
| sys.exit(4) | |
| print("\n--- submission.csv (first 5 rows) ---") | |
| print(df.head(5).to_string(index=False)) | |
| print(f"\nTotal rows: {len(df)}") | |
| print(f"Score range: [{df['score'].min():.6f}, {df['score'].max():.6f}]") | |
| print(f"Wall-clock: {wall_elapsed:.1f}s") | |
| if __name__ == "__main__": | |
| main() | |