Spaces:
Sleeping
Sleeping
| # Sampling with Tournament | |
| # input: | |
| # original: original word | |
| # alternatives: substitute candidates | |
| # similarity: similarity scores | |
| # context: 1-token left context | |
| # key: secret key for hashing | |
| # m: number of tournament rounds. Set to 2 | |
| # c: number of candidates per round (not including original): c is set to the number of alternatives = K | |
| # alpha: scaling factor like temperature. Set to 1 | |
| # Process: | |
| # 1. Normalize similarity scores to [0,1] though softmax with scaling factor alpha | |
| # 2. Create random score for each candidate with m rounds: so we have a matrix of shape (K, m) | |
| # 3. Each score is generated by hashing key, context, candidate, round index | |
| # 4. For the first round, sampling from normalized similarity scores M = K^m candidates (each candidate can appear multiple times) and divide into groups of size K | |
| # 5. For each group, select the candidate with the highest random score to go to the next round. If they are tied, just randomly select one of the tied candidates. For example, K = 3, we have 1 1 0 score, so break the 1 1 tie randomly. | |
| # 6: For next round, use the winners from the previous round, but the scores from the matrix for that round. Repeat until we have one winner. | |
| # Output: return winner candidate index | |
| import hmac | |
| import hashlib | |
| import math | |
| from typing import List, Tuple | |
| def _softmax(xs: List[float], alpha: float = 1.0) -> List[float]: | |
| # Temperature-scaled softmax normalized to sum 1 | |
| if not xs: | |
| return [] | |
| # subtract max for numerical stability | |
| m = max(xs) | |
| zs = [math.exp(alpha * (x - m)) for x in xs] | |
| s = sum(zs) | |
| return [z / s for z in zs] | |
| def _hash_bytes(key: str, payload: str) -> bytes: | |
| return hmac.new(key.encode("utf-8"), payload.encode("utf-8"), hashlib.sha256).digest() | |
| def _hash_to_uniform01(key: str, payload: str) -> float: | |
| # Map 8 bytes to [0,1) as a 64-bit integer / 2**64 | |
| b = _hash_bytes(key, payload)[:8] | |
| n = int.from_bytes(b, "big", signed=False) | |
| return n / 2**64 | |
| def _ctx_str(context: List[str]) -> str: | |
| # Use last token (you can widen to last-4 if you like) | |
| toks = (context or []) | |
| return " ".join(toks[-1:]).lower() | |
| def _per_round_score(key: str, ctx: str, candidate: str, round_idx: int) -> int: | |
| # Deterministic "random" score following Bernoulli(0.5): 0 or 1 | |
| payload = f"score::{ctx}::{candidate}::r{round_idx}" | |
| uniform_score = _hash_to_uniform01(key, payload) | |
| # Convert uniform [0,1) to Bernoulli(0.5): 0 if < 0.5, 1 if >= 0.5 | |
| return 1 if uniform_score >= 0.5 else 0 | |
| def _sample_categorical_from_uniform(u: float, probs: List[float]) -> int: | |
| # Inverse-CDF sampling from probs using a uniform u in [0,1) | |
| c = 0.0 | |
| for i, p in enumerate(probs): | |
| c += p | |
| if u < c: | |
| return i | |
| return len(probs) - 1 # fallback on last due to float sums | |
| def _draw_M_candidates(key: str, ctx: str, probs: List[float], K: int, c: int, m: int) -> List[int]: | |
| """ | |
| Draw M = c^m candidates (indices 0..K-1) with replacement from probs, | |
| using a deterministic stream of uniforms keyed by (key, ctx). | |
| """ | |
| M = c ** m | |
| picks = [] | |
| for draw_idx in range(M): | |
| u = _hash_to_uniform01(key, f"draw::{ctx}::m{m}::c{c}::i{draw_idx}") | |
| j = _sample_categorical_from_uniform(u, probs) | |
| picks.append(j) | |
| return picks | |
| def _run_tournament_round( | |
| key: str, | |
| ctx: str, | |
| picks: List[int], | |
| candidates: List[str], | |
| round_idx: int, | |
| group_size: int, | |
| ) -> List[int]: | |
| """ | |
| Split 'picks' into consecutive groups of size 'group_size'. | |
| For each group, select the winner = argmax per-round-score; | |
| tie-break deterministically. | |
| Returns the list of winning indices (into candidates list). | |
| """ | |
| assert len(picks) % group_size == 0, "Group partition must divide evenly." | |
| winners = [] | |
| # Add detailed logging for all rounds | |
| # print(f"\n=== ROUND {round_idx} DETAILS ===") | |
| # print(f"Total picks: {len(picks)}") | |
| # print(f"Group size: {group_size}") | |
| # print(f"Number of groups: {len(picks) // group_size}") | |
| # print(f"Picks array: {picks}") | |
| # print(f"Candidates: {candidates}") | |
| for g in range(0, len(picks), group_size): | |
| group = picks[g:g+group_size] # indices into candidates list | |
| group_num = g // group_size + 1 | |
| # Compute per-round scores | |
| scored: List[Tuple[int, int]] = [] | |
| for idx in group: | |
| cand = candidates[idx] | |
| s = _per_round_score(key, ctx, cand, round_idx) | |
| scored.append((s, idx)) | |
| # Add detailed logging for all rounds | |
| # print(f"\n--- Group {group_num} ---") | |
| # print(f"Group indices: {group}") | |
| # print(f"Group candidates: {[candidates[idx] for idx in group]}") | |
| # print("Scores:") | |
| # for score, idx in scored: | |
| # print(f" {candidates[idx]}: {score}") | |
| # Find max score; tie-break with a deterministic secondary key | |
| max_s = max(s for s, _ in scored) | |
| tied = [idx for (s, idx) in scored if s == max_s] | |
| if len(tied) == 1: | |
| winner_idx = tied[0] | |
| winners.append(winner_idx) | |
| # print(f"Winner: {candidates[winner_idx]} (score: {max_s})") | |
| else: | |
| # Random tie-breaker: pick one randomly from tied candidates | |
| import random | |
| # Create a more robust seed by combining key, context, round, and group info | |
| seed_string = f"{key}:{ctx}:r{round_idx}:g{group_num}" | |
| # Convert string to integer seed using a more reliable method | |
| seed_value = sum(ord(c) * (i + 1) for i, c in enumerate(seed_string)) | |
| random.seed(seed_value) | |
| winner_idx = random.choice(tied) | |
| random.seed() # Reset seed to avoid affecting other random operations | |
| winners.append(winner_idx) | |
| # print(f"TIE! Winners: {[candidates[idx] for idx in tied]}") | |
| # print(f"Random tie-breaker winner: {candidates[winner_idx]} (score: {max_s})") | |
| # print(f" (Seed used: {seed_string} -> {seed_value})") | |
| # Add summary logging for all rounds | |
| # print(f"\nRound {round_idx} winners: {[candidates[idx] for idx in winners]}") | |
| # print(f"Winner indices: {winners}") | |
| # print("=" * 50) | |
| return winners | |
| def _show_score_matrix(key: str, ctx: str, candidates: List[str], m: int): | |
| """ | |
| Display the complete score matrix for all candidates across all rounds. | |
| Each candidate gets a Bernoulli(0.5) score for each round. | |
| """ | |
| # print(f"\n=== SCORE MATRIX (Bernoulli 0.5) ===") | |
| # print(f"Format: candidate -> [round1_score, round2_score, ...]") | |
| # for i, candidate in enumerate(candidates): | |
| # scores = [] | |
| # for round_idx in range(1, m + 1): | |
| # score = _per_round_score(key, ctx, candidate, round_idx) | |
| # scores.append(score) | |
| # print(f"{candidate:8} -> {scores}") | |
| # print("=" * 50) | |
| pass | |
| def tournament_randomize( | |
| original: str, | |
| alternatives: List[str], | |
| similarity: List[float], | |
| context: List[str], | |
| key: str, | |
| m: int = 2, | |
| c: int = 2, | |
| alpha: float = 1.0, | |
| ) -> int: | |
| """ | |
| Tournament sampling among alternatives only (K = len(alternatives)). | |
| Returns the WINNER INDEX (0..K-1) into 'alternatives'. | |
| Steps: | |
| 1) softmax(similarity, alpha) -> probs over alternatives | |
| 2) build per-round scores for each candidate via HMAC (on the fly) | |
| 3) Round 1: draw M = c^m picks from probs; group into size c; pick per-group winners by scores of round 1 | |
| 4) Next rounds: group winners into size c; pick winners using that round's scores | |
| 5) Repeat until one winner remains | |
| """ | |
| assert len(alternatives) == len(similarity) > 0, "Need at least one alternative with a similarity score." | |
| assert m >= 1, "Tournament rounds m must be >= 1" | |
| assert c >= 2, "Number of competitors per match c must be >= 2" | |
| K = len(alternatives) | |
| ctx = _ctx_str(context) | |
| probs = _softmax(similarity, alpha=alpha) | |
| # Add detailed logging for setup | |
| # print(f"\n=== TOURNAMENT SETUP ===") | |
| # print(f"Original word: '{original}'") | |
| # print(f"Alternatives: {alternatives}") | |
| # print(f"Similarity scores: {similarity}") | |
| # print(f"Context: {context}") | |
| # print(f"Key: {key}") | |
| # print(f"Tournament rounds (m): {m}") | |
| # print(f"Competitors per match (c): {c}") | |
| # print(f"Alpha (temperature): {alpha}") | |
| # print(f"K (number of alternatives): {K}") | |
| # print(f"Context string: '{ctx}'") | |
| # print(f"Normalized probabilities: {[f'{p:.6f}' for p in probs]}") | |
| # print(f"Expected picks (M = c^m): {c**m}") | |
| # First round: M = c^m draws from probs | |
| picks = _draw_M_candidates(key, ctx, probs, K, c, m) # indices 0..K-1 into alternatives | |
| # Show the picks array for the first round | |
| # print(f"Generated picks array: {picks}") | |
| # print(f"Picks correspond to words: {[alternatives[idx] for idx in picks]}") | |
| # We treat 'candidates' as the alternatives list; indices map directly | |
| candidates = alternatives | |
| # Show the complete score matrix for all candidates across all rounds | |
| _show_score_matrix(key, ctx, candidates, m) | |
| # Run m rounds. Each round groups current list into blocks of size c, | |
| # picks one per group using per-round scores. | |
| current = picks | |
| for r in range(1, m + 1): | |
| # group_size is now c instead of K | |
| current = _run_tournament_round( | |
| key=key, | |
| ctx=ctx, | |
| picks=current, | |
| candidates=candidates, | |
| round_idx=r, | |
| group_size=c, | |
| ) | |
| # After each round, the number of survivors shrinks by factor c. | |
| # After m rounds, we must have exactly one winner. | |
| assert len(current) == 1, f"Expected a single winner after {m} rounds, got {len(current)}" | |
| final_winner_idx = current[0] | |
| # print(f"\n=== FINAL RESULT ===") | |
| # print(f"Winner index: {final_winner_idx}") | |
| # print(f"Winner word: '{candidates[final_winner_idx]}'") | |
| return final_winner_idx | |
| # Convenience wrapper that returns the selected word | |
| def tournament_select_word( | |
| original: str, | |
| alternatives: List[str], | |
| similarity: List[float], | |
| context: List[str], | |
| key: str, | |
| m: int = 2, | |
| c: int = 2, | |
| alpha: float = 1.0, | |
| ) -> str: | |
| idx = tournament_randomize(original, alternatives, similarity, context, key, m, c, alpha) | |
| return alternatives[idx] | |
| # # Sample running | |
| # original = "big" | |
| # alts = ["large", "huge", "massive"] # K = 3 | |
| # sims = [0.82, 0.55, 0.60] # any similarity scores | |
| # ctx = ["a"] # 1-token left context (e.g., "... a ___") | |
| # key = "super-secret-key" | |
| # m = 2 | |
| # c = 2 | |
| # alpha = 1.0 | |
| # winner_idx = tournament_randomize(original, alts, sims, ctx, key, m, c, alpha) | |
| # print("Winner index:", winner_idx, "->", alts[winner_idx]) | |
| # # Or directly: | |
| # print("Winner word:", tournament_select_word(original, alts, sims, ctx, key, m, c, alpha)) | |
| # # Test with a different key | |
| # key2 = "different-secret-key" | |
| # winner_idx2 = tournament_randomize(original, alts, sims, ctx, key2, m, c, alpha) | |
| # print("Winner index with different key:", winner_idx2, "->", alts[winner_idx2]) | |
| # # Test with a different key | |
| # key3 = "key224242" | |
| # winner_idx3 = tournament_randomize(original, alts, sims, ctx, key3, m, c, alpha) | |
| # print("Winner index with different key:", winner_idx3, "->", alts[winner_idx3]) | |