File size: 5,432 Bytes
91e7690
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
from __future__ import annotations

import itertools
import re
from dataclasses import dataclass
from hashlib import sha1


_ALGO_BANK: list["AlgorithmSpec"] | None = None
_BEST_SPEC_CACHE: dict[str, "AlgorithmSpec"] = {}


@dataclass(frozen=True)
class AlgorithmSpec:
    algorithm_id: int
    w_coverage: float
    w_stat: float
    w_risk: float
    w_novelty: float
    w_limit: float
    w_prior: float
    repeat_penalty: float


def generate_100k_algorithms() -> list[AlgorithmSpec]:
    """Generate exactly 100,000 deterministic algorithm specs."""
    global _ALGO_BANK
    if _ALGO_BANK is not None:
        return _ALGO_BANK

    out: list[AlgorithmSpec] = []
    # 10 * 10 * 10 * 10 * 5 * 2 = 100,000
    grids = [
        [i / 10 for i in range(10)],
        [i / 10 for i in range(10)],
        [i / 10 for i in range(10)],
        [i / 10 for i in range(10)],
        [i / 5 for i in range(5)],
        [0.0, 1.0],
    ]

    idx = 0
    for a, b, c, d, e, f in itertools.product(*grids):
        out.append(
            AlgorithmSpec(
                algorithm_id=idx,
                w_coverage=a,
                w_stat=b,
                w_risk=c,
                w_novelty=d,
                w_limit=e,
                w_prior=(idx % 5) / 5,
                repeat_penalty=f * 0.03,
            )
        )
        idx += 1

    _ALGO_BANK = out
    return _ALGO_BANK


def _query_features(sql: str) -> dict[str, float]:
    s = (sql or "").lower()
    return {
        "coverage": float(any(k in s for k in ["count(", "sum(", "avg(", "group by", "distinct"])),
        "stat": float(any(k in s for k in ["avg(", "stddev", "variance", "percentile", "try_cast", "strptime"])),
        "risk": float(any(k in s for k in ["drop", "truncate", "delete", "insert", "update", "alter", "create"])),
        "novelty": float(any(k in s for k in ["left join", "except", "not in", "having", "case when"])),
        "has_limit": float("limit" in s),
    }


def _task_relevance(task_id: int, sql: str) -> float:
    s = (sql or "").lower()
    if task_id == 1:
        keys = ["null", "email", "customer_id", "duplicate", "group by"]
    elif task_id == 2:
        keys = ["quantity", "amount", "n/a", "try_cast", "order_date"]
    else:
        keys = ["transactions_baseline", "transactions_current", "category", "user_id", "avg(amount)"]
    hits = sum(1 for k in keys if k in s)
    return hits / max(1, len(keys))


def algorithm_rule_check(spec: AlgorithmSpec, queries: list[str], max_steps: int = 10) -> bool:
    """
    Enforces constraints aligned with hackathon rules for this environment:
    - non-destructive SQL preference
    - bounded steps
    - deterministic finite parameters
    """
    if max_steps <= 0 or max_steps > 10:
        return False
    if spec.w_risk < 0.0 or spec.w_risk > 1.0:
        return False
    if spec.repeat_penalty < 0.0 or spec.repeat_penalty > 0.03:
        return False

    for q in queries:
        s = (q or "").strip()
        if not s:
            return False
        if re.search(r"\b(drop|truncate|delete|insert|update|alter|create)\b", s, flags=re.IGNORECASE):
            return False
        if not re.match(r"^\s*(select|with)\b", s, flags=re.IGNORECASE):
            return False
    return True


def rank_queries(task_id: int, queries: list[str], priors: list[float], spec: AlgorithmSpec) -> list[int]:
    scored: list[tuple[int, float]] = []
    for i, q in enumerate(queries):
        f = _query_features(q)
        prior = priors[i] if i < len(priors) else 0.0
        relevance = _task_relevance(task_id, q)
        score = (
            spec.w_coverage * f["coverage"]
            + spec.w_stat * f["stat"]
            + spec.w_novelty * f["novelty"]
            + spec.w_limit * f["has_limit"]
            + spec.w_prior * prior
            + 0.8 * relevance
            - spec.w_risk * f["risk"]
        )
        scored.append((i, score))
    scored.sort(key=lambda x: x[1], reverse=True)
    return [i for i, _ in scored]


def choose_best_algorithm(task_id: int, queries: list[str], priors: list[float], max_algorithms: int = 100_000) -> AlgorithmSpec:
    key_payload = f"t={task_id}|n={len(queries)}|m={max_algorithms}|q={'||'.join(queries)}|p={','.join(f'{x:.4f}' for x in priors)}"
    cache_key = sha1(key_payload.encode("utf-8")).hexdigest()
    if cache_key in _BEST_SPEC_CACHE:
        return _BEST_SPEC_CACHE[cache_key]

    algorithms = generate_100k_algorithms()
    n = min(max_algorithms, len(algorithms))

    best = algorithms[0]
    best_obj = -1e18

    for spec in algorithms[:n]:
        if not algorithm_rule_check(spec, queries, max_steps=10):
            continue
        ranking = rank_queries(task_id, queries, priors, spec)
        top = ranking[:2]
        obj = 0.0
        for pos, i in enumerate(top):
            base = 2.0 - pos
            rel = _task_relevance(task_id, queries[i])
            obj += base * rel
        # Prefer slight risk aversion
        obj -= 0.1 * spec.w_risk
        if obj > best_obj:
            best_obj = obj
            best = spec

    _BEST_SPEC_CACHE[cache_key] = best
    return best


def order_queries_with_100k_algorithms(task_id: int, queries: list[str], priors: list[float]) -> list[str]:
    spec = choose_best_algorithm(task_id, queries, priors, max_algorithms=100_000)
    ranked_idx = rank_queries(task_id, queries, priors, spec)
    return [queries[i] for i in ranked_idx]