Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import itertools | |
| import re | |
| from dataclasses import dataclass | |
| from hashlib import sha1 | |
| _ALGO_BANK: list["AlgorithmSpec"] | None = None | |
| _BEST_SPEC_CACHE: dict[str, "AlgorithmSpec"] = {} | |
| class AlgorithmSpec: | |
| algorithm_id: int | |
| w_coverage: float | |
| w_stat: float | |
| w_risk: float | |
| w_novelty: float | |
| w_limit: float | |
| w_prior: float | |
| repeat_penalty: float | |
| def generate_100k_algorithms() -> list[AlgorithmSpec]: | |
| """Generate exactly 100,000 deterministic algorithm specs.""" | |
| global _ALGO_BANK | |
| if _ALGO_BANK is not None: | |
| return _ALGO_BANK | |
| out: list[AlgorithmSpec] = [] | |
| # 10 * 10 * 10 * 10 * 5 * 2 = 100,000 | |
| grids = [ | |
| [i / 10 for i in range(10)], | |
| [i / 10 for i in range(10)], | |
| [i / 10 for i in range(10)], | |
| [i / 10 for i in range(10)], | |
| [i / 5 for i in range(5)], | |
| [0.0, 1.0], | |
| ] | |
| idx = 0 | |
| for a, b, c, d, e, f in itertools.product(*grids): | |
| out.append( | |
| AlgorithmSpec( | |
| algorithm_id=idx, | |
| w_coverage=a, | |
| w_stat=b, | |
| w_risk=c, | |
| w_novelty=d, | |
| w_limit=e, | |
| w_prior=(idx % 5) / 5, | |
| repeat_penalty=f * 0.03, | |
| ) | |
| ) | |
| idx += 1 | |
| _ALGO_BANK = out | |
| return _ALGO_BANK | |
| def _query_features(sql: str) -> dict[str, float]: | |
| s = (sql or "").lower() | |
| return { | |
| "coverage": float(any(k in s for k in ["count(", "sum(", "avg(", "group by", "distinct"])), | |
| "stat": float(any(k in s for k in ["avg(", "stddev", "variance", "percentile", "try_cast", "strptime"])), | |
| "risk": float(any(k in s for k in ["drop", "truncate", "delete", "insert", "update", "alter", "create"])), | |
| "novelty": float(any(k in s for k in ["left join", "except", "not in", "having", "case when"])), | |
| "has_limit": float("limit" in s), | |
| } | |
| def _task_relevance(task_id: int, sql: str) -> float: | |
| s = (sql or "").lower() | |
| if task_id == 1: | |
| keys = ["null", "email", "customer_id", "duplicate", "group by"] | |
| elif task_id == 2: | |
| keys = ["quantity", "amount", "n/a", "try_cast", "order_date"] | |
| else: | |
| keys = ["transactions_baseline", "transactions_current", "category", "user_id", "avg(amount)"] | |
| hits = sum(1 for k in keys if k in s) | |
| return hits / max(1, len(keys)) | |
| def algorithm_rule_check(spec: AlgorithmSpec, queries: list[str], max_steps: int = 10) -> bool: | |
| """ | |
| Enforces constraints aligned with hackathon rules for this environment: | |
| - non-destructive SQL preference | |
| - bounded steps | |
| - deterministic finite parameters | |
| """ | |
| if max_steps <= 0 or max_steps > 10: | |
| return False | |
| if spec.w_risk < 0.0 or spec.w_risk > 1.0: | |
| return False | |
| if spec.repeat_penalty < 0.0 or spec.repeat_penalty > 0.03: | |
| return False | |
| for q in queries: | |
| s = (q or "").strip() | |
| if not s: | |
| return False | |
| if re.search(r"\b(drop|truncate|delete|insert|update|alter|create)\b", s, flags=re.IGNORECASE): | |
| return False | |
| if not re.match(r"^\s*(select|with)\b", s, flags=re.IGNORECASE): | |
| return False | |
| return True | |
| def rank_queries(task_id: int, queries: list[str], priors: list[float], spec: AlgorithmSpec) -> list[int]: | |
| scored: list[tuple[int, float]] = [] | |
| for i, q in enumerate(queries): | |
| f = _query_features(q) | |
| prior = priors[i] if i < len(priors) else 0.0 | |
| relevance = _task_relevance(task_id, q) | |
| score = ( | |
| spec.w_coverage * f["coverage"] | |
| + spec.w_stat * f["stat"] | |
| + spec.w_novelty * f["novelty"] | |
| + spec.w_limit * f["has_limit"] | |
| + spec.w_prior * prior | |
| + 0.8 * relevance | |
| - spec.w_risk * f["risk"] | |
| ) | |
| scored.append((i, score)) | |
| scored.sort(key=lambda x: x[1], reverse=True) | |
| return [i for i, _ in scored] | |
| def choose_best_algorithm(task_id: int, queries: list[str], priors: list[float], max_algorithms: int = 100_000) -> AlgorithmSpec: | |
| key_payload = f"t={task_id}|n={len(queries)}|m={max_algorithms}|q={'||'.join(queries)}|p={','.join(f'{x:.4f}' for x in priors)}" | |
| cache_key = sha1(key_payload.encode("utf-8")).hexdigest() | |
| if cache_key in _BEST_SPEC_CACHE: | |
| return _BEST_SPEC_CACHE[cache_key] | |
| algorithms = generate_100k_algorithms() | |
| n = min(max_algorithms, len(algorithms)) | |
| best = algorithms[0] | |
| best_obj = -1e18 | |
| for spec in algorithms[:n]: | |
| if not algorithm_rule_check(spec, queries, max_steps=10): | |
| continue | |
| ranking = rank_queries(task_id, queries, priors, spec) | |
| top = ranking[:2] | |
| obj = 0.0 | |
| for pos, i in enumerate(top): | |
| base = 2.0 - pos | |
| rel = _task_relevance(task_id, queries[i]) | |
| obj += base * rel | |
| # Prefer slight risk aversion | |
| obj -= 0.1 * spec.w_risk | |
| if obj > best_obj: | |
| best_obj = obj | |
| best = spec | |
| _BEST_SPEC_CACHE[cache_key] = best | |
| return best | |
| def order_queries_with_100k_algorithms(task_id: int, queries: list[str], priors: list[float]) -> list[str]: | |
| spec = choose_best_algorithm(task_id, queries, priors, max_algorithms=100_000) | |
| ranked_idx = rank_queries(task_id, queries, priors, spec) | |
| return [queries[i] for i in ranked_idx] | |