Spaces:
Running
Running
| """ | |
| ROSA-QKV 算法实现及步骤追踪 | |
| """ | |
| import json | |
| import time | |
| from typing import List, Dict, Any, Tuple | |
| def find_line_number(lines: list[str], needle: str, fallback: int = 1) -> int: | |
| """在代码行列表中查找包含特定字符串的行号""" | |
| for index, line in enumerate(lines, start=1): | |
| if needle in line: | |
| return index | |
| return fallback | |
| def find_line_after( | |
| lines: list[str], start_line: int, needle: str, fallback: int | |
| ) -> int: | |
| """从指定行开始查找包含特定字符串的行号""" | |
| start_index = min(max(start_line, 0), len(lines)) | |
| for index in range(start_index, len(lines)): | |
| if needle in lines[index]: | |
| return index + 1 | |
| return fallback | |
| def initialize_line_numbers(code: str) -> Dict[str, int]: | |
| """初始化代码中各关键行的行号""" | |
| lines = code.splitlines() | |
| return { | |
| "LINE_FOR_I": find_line_number(lines, "for i in range(n):"), | |
| "LINE_FOR_W": find_line_number(lines, "for w in range(i+1,0,-1):"), | |
| "LINE_T_ASSIGN": find_line_number(lines, "t=qqq[i+1-w:i+1]"), | |
| "LINE_FOR_J": find_line_number(lines, "for j in range(i-w,-1,-1):"), | |
| "LINE_TRY": find_line_number(lines, "if kkk[j:j+w]==t:"), | |
| "LINE_ASSIGN": find_line_number(lines, "out[i]=vvv[j+w]"), | |
| "LINE_BREAK_INNER": find_line_after( | |
| lines, | |
| find_line_number(lines, "out[i]=vvv[j+w]"), | |
| "break", | |
| find_line_number(lines, "out[i]=vvv[j+w]"), | |
| ), | |
| "LINE_IF_OUT": find_line_number(lines, "if out[i]!=-1:"), | |
| "LINE_BREAK_OUTER": find_line_after( | |
| lines, | |
| find_line_number(lines, "if out[i]!=-1:"), | |
| "break", | |
| find_line_number(lines, "if out[i]!=-1:"), | |
| ), | |
| "LINE_RETURN": find_line_number(lines, "return out"), | |
| } | |
| def rosa_steps( | |
| q: list[int], k: list[int], v: list[int], line_numbers: Dict[str, int] | |
| ) -> Tuple[List[Dict], List[int]]: | |
| """ | |
| 执行 ROSA-QKV 算法并记录每个步骤 | |
| 返回: (步骤列表, 输出列表) | |
| """ | |
| n = len(q) | |
| out = [-1] * n | |
| steps: list[dict] = [] | |
| for i in range(n): | |
| steps.append({"phase": "loop_i", "i": i, "line": line_numbers["LINE_FOR_I"]}) | |
| matched = False | |
| match_j = -1 | |
| match_w = -1 | |
| for w in range(i + 1, 0, -1): | |
| steps.append( | |
| {"phase": "loop_w", "i": i, "w": w, "line": line_numbers["LINE_FOR_W"]} | |
| ) | |
| t = q[i + 1 - w : i + 1] | |
| t_str = "".join(str(x) for x in t) | |
| steps.append( | |
| { | |
| "phase": "assign_t", | |
| "i": i, | |
| "w": w, | |
| "t": t_str, | |
| "line": line_numbers["LINE_T_ASSIGN"], | |
| } | |
| ) | |
| for j in range(i - w, -1, -1): | |
| steps.append( | |
| { | |
| "phase": "loop_j", | |
| "i": i, | |
| "w": w, | |
| "j": j, | |
| "line": line_numbers["LINE_FOR_J"], | |
| } | |
| ) | |
| is_match = k[j : j + w] == t | |
| steps.append( | |
| { | |
| "phase": "try", | |
| "i": i, | |
| "w": w, | |
| "j": j, | |
| "t": t_str, | |
| "matched": is_match, | |
| "line": line_numbers["LINE_TRY"], | |
| } | |
| ) | |
| if is_match: | |
| value = v[j + w] | |
| out[i] = value | |
| match_j = j | |
| match_w = w | |
| steps.append( | |
| { | |
| "phase": "assign", | |
| "i": i, | |
| "w": w, | |
| "j": j, | |
| "t": t_str, | |
| "v_index": j + w, | |
| "value": value, | |
| "line": line_numbers["LINE_ASSIGN"], | |
| } | |
| ) | |
| steps.append( | |
| { | |
| "phase": "break_inner", | |
| "i": i, | |
| "w": w, | |
| "j": j, | |
| "line": line_numbers["LINE_BREAK_INNER"], | |
| } | |
| ) | |
| matched = True | |
| break | |
| if matched: | |
| steps.append( | |
| { | |
| "phase": "check", | |
| "i": i, | |
| "w": match_w, | |
| "j": match_j, | |
| "matched": True, | |
| "line": line_numbers["LINE_IF_OUT"], | |
| } | |
| ) | |
| if i == n - 1: | |
| steps.append( | |
| { | |
| "phase": "break_outer", | |
| "i": i, | |
| "w": match_w, | |
| "j": match_j, | |
| "line": line_numbers["LINE_BREAK_OUTER"], | |
| } | |
| ) | |
| break | |
| if not matched: | |
| out[i] = -1 | |
| steps.append( | |
| { | |
| "phase": "output", | |
| "i": i, | |
| "value": out[i], | |
| "matched": matched, | |
| } | |
| ) | |
| if i == n - 1: | |
| steps.append({"phase": "return", "line": line_numbers["LINE_RETURN"]}) | |
| return steps, out | |
| def format_output(out: list[int]) -> str: | |
| """格式化输出结果""" | |
| if any(value < 0 or value > 1 for value in out): | |
| return " ".join(str(value) for value in out) | |
| return "".join(str(value) for value in out) | |
| def build_payload( | |
| q: list[int], k: list[int], v: list[int], line_numbers: Dict[str, int] | |
| ) -> Tuple[str, str]: | |
| """构建 JSON 负载和格式化输出""" | |
| steps, out = rosa_steps(q, k, v, line_numbers) | |
| payload = { | |
| "q": q, | |
| "k": k, | |
| "v": v, | |
| "steps": steps, | |
| "out": out, | |
| "run_id": time.time_ns(), | |
| } | |
| return json.dumps(payload, ensure_ascii=False), format_output(out) | |