Spaces:
Sleeping
Sleeping
File size: 5,432 Bytes
91e7690 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 | from __future__ import annotations
import itertools
import re
from dataclasses import dataclass
from hashlib import sha1
_ALGO_BANK: list["AlgorithmSpec"] | None = None
_BEST_SPEC_CACHE: dict[str, "AlgorithmSpec"] = {}
@dataclass(frozen=True)
class AlgorithmSpec:
algorithm_id: int
w_coverage: float
w_stat: float
w_risk: float
w_novelty: float
w_limit: float
w_prior: float
repeat_penalty: float
def generate_100k_algorithms() -> list[AlgorithmSpec]:
"""Generate exactly 100,000 deterministic algorithm specs."""
global _ALGO_BANK
if _ALGO_BANK is not None:
return _ALGO_BANK
out: list[AlgorithmSpec] = []
# 10 * 10 * 10 * 10 * 5 * 2 = 100,000
grids = [
[i / 10 for i in range(10)],
[i / 10 for i in range(10)],
[i / 10 for i in range(10)],
[i / 10 for i in range(10)],
[i / 5 for i in range(5)],
[0.0, 1.0],
]
idx = 0
for a, b, c, d, e, f in itertools.product(*grids):
out.append(
AlgorithmSpec(
algorithm_id=idx,
w_coverage=a,
w_stat=b,
w_risk=c,
w_novelty=d,
w_limit=e,
w_prior=(idx % 5) / 5,
repeat_penalty=f * 0.03,
)
)
idx += 1
_ALGO_BANK = out
return _ALGO_BANK
def _query_features(sql: str) -> dict[str, float]:
s = (sql or "").lower()
return {
"coverage": float(any(k in s for k in ["count(", "sum(", "avg(", "group by", "distinct"])),
"stat": float(any(k in s for k in ["avg(", "stddev", "variance", "percentile", "try_cast", "strptime"])),
"risk": float(any(k in s for k in ["drop", "truncate", "delete", "insert", "update", "alter", "create"])),
"novelty": float(any(k in s for k in ["left join", "except", "not in", "having", "case when"])),
"has_limit": float("limit" in s),
}
def _task_relevance(task_id: int, sql: str) -> float:
s = (sql or "").lower()
if task_id == 1:
keys = ["null", "email", "customer_id", "duplicate", "group by"]
elif task_id == 2:
keys = ["quantity", "amount", "n/a", "try_cast", "order_date"]
else:
keys = ["transactions_baseline", "transactions_current", "category", "user_id", "avg(amount)"]
hits = sum(1 for k in keys if k in s)
return hits / max(1, len(keys))
def algorithm_rule_check(spec: AlgorithmSpec, queries: list[str], max_steps: int = 10) -> bool:
"""
Enforces constraints aligned with hackathon rules for this environment:
- non-destructive SQL preference
- bounded steps
- deterministic finite parameters
"""
if max_steps <= 0 or max_steps > 10:
return False
if spec.w_risk < 0.0 or spec.w_risk > 1.0:
return False
if spec.repeat_penalty < 0.0 or spec.repeat_penalty > 0.03:
return False
for q in queries:
s = (q or "").strip()
if not s:
return False
if re.search(r"\b(drop|truncate|delete|insert|update|alter|create)\b", s, flags=re.IGNORECASE):
return False
if not re.match(r"^\s*(select|with)\b", s, flags=re.IGNORECASE):
return False
return True
def rank_queries(task_id: int, queries: list[str], priors: list[float], spec: AlgorithmSpec) -> list[int]:
scored: list[tuple[int, float]] = []
for i, q in enumerate(queries):
f = _query_features(q)
prior = priors[i] if i < len(priors) else 0.0
relevance = _task_relevance(task_id, q)
score = (
spec.w_coverage * f["coverage"]
+ spec.w_stat * f["stat"]
+ spec.w_novelty * f["novelty"]
+ spec.w_limit * f["has_limit"]
+ spec.w_prior * prior
+ 0.8 * relevance
- spec.w_risk * f["risk"]
)
scored.append((i, score))
scored.sort(key=lambda x: x[1], reverse=True)
return [i for i, _ in scored]
def choose_best_algorithm(task_id: int, queries: list[str], priors: list[float], max_algorithms: int = 100_000) -> AlgorithmSpec:
key_payload = f"t={task_id}|n={len(queries)}|m={max_algorithms}|q={'||'.join(queries)}|p={','.join(f'{x:.4f}' for x in priors)}"
cache_key = sha1(key_payload.encode("utf-8")).hexdigest()
if cache_key in _BEST_SPEC_CACHE:
return _BEST_SPEC_CACHE[cache_key]
algorithms = generate_100k_algorithms()
n = min(max_algorithms, len(algorithms))
best = algorithms[0]
best_obj = -1e18
for spec in algorithms[:n]:
if not algorithm_rule_check(spec, queries, max_steps=10):
continue
ranking = rank_queries(task_id, queries, priors, spec)
top = ranking[:2]
obj = 0.0
for pos, i in enumerate(top):
base = 2.0 - pos
rel = _task_relevance(task_id, queries[i])
obj += base * rel
# Prefer slight risk aversion
obj -= 0.1 * spec.w_risk
if obj > best_obj:
best_obj = obj
best = spec
_BEST_SPEC_CACHE[cache_key] = best
return best
def order_queries_with_100k_algorithms(task_id: int, queries: list[str], priors: list[float]) -> list[str]:
spec = choose_best_algorithm(task_id, queries, priors, max_algorithms=100_000)
ranked_idx = rank_queries(task_id, queries, priors, spec)
return [queries[i] for i in ranked_idx]
|