""" ROSA-QKV 算法实现及步骤追踪 """ import json import time from typing import List, Dict, Any, Tuple def find_line_number(lines: list[str], needle: str, fallback: int = 1) -> int: """在代码行列表中查找包含特定字符串的行号""" for index, line in enumerate(lines, start=1): if needle in line: return index return fallback def find_line_after( lines: list[str], start_line: int, needle: str, fallback: int ) -> int: """从指定行开始查找包含特定字符串的行号""" start_index = min(max(start_line, 0), len(lines)) for index in range(start_index, len(lines)): if needle in lines[index]: return index + 1 return fallback def initialize_line_numbers(code: str) -> Dict[str, int]: """初始化代码中各关键行的行号""" lines = code.splitlines() return { "LINE_FOR_I": find_line_number(lines, "for i in range(n):"), "LINE_FOR_W": find_line_number(lines, "for w in range(i+1,0,-1):"), "LINE_T_ASSIGN": find_line_number(lines, "t=qqq[i+1-w:i+1]"), "LINE_FOR_J": find_line_number(lines, "for j in range(i-w,-1,-1):"), "LINE_TRY": find_line_number(lines, "if kkk[j:j+w]==t:"), "LINE_ASSIGN": find_line_number(lines, "out[i]=vvv[j+w]"), "LINE_BREAK_INNER": find_line_after( lines, find_line_number(lines, "out[i]=vvv[j+w]"), "break", find_line_number(lines, "out[i]=vvv[j+w]"), ), "LINE_IF_OUT": find_line_number(lines, "if out[i]!=-1:"), "LINE_BREAK_OUTER": find_line_after( lines, find_line_number(lines, "if out[i]!=-1:"), "break", find_line_number(lines, "if out[i]!=-1:"), ), "LINE_RETURN": find_line_number(lines, "return out"), } def rosa_steps( q: list[int], k: list[int], v: list[int], line_numbers: Dict[str, int] ) -> Tuple[List[Dict], List[int]]: """ 执行 ROSA-QKV 算法并记录每个步骤 返回: (步骤列表, 输出列表) """ n = len(q) out = [-1] * n steps: list[dict] = [] for i in range(n): steps.append({"phase": "loop_i", "i": i, "line": line_numbers["LINE_FOR_I"]}) matched = False match_j = -1 match_w = -1 for w in range(i + 1, 0, -1): steps.append( {"phase": "loop_w", "i": i, "w": w, "line": line_numbers["LINE_FOR_W"]} ) t = q[i + 1 - w : i + 1] t_str = "".join(str(x) for x in t) steps.append( { "phase": "assign_t", "i": i, "w": w, "t": t_str, "line": line_numbers["LINE_T_ASSIGN"], } ) for j in range(i - w, -1, -1): steps.append( { "phase": "loop_j", "i": i, "w": w, "j": j, "line": line_numbers["LINE_FOR_J"], } ) is_match = k[j : j + w] == t steps.append( { "phase": "try", "i": i, "w": w, "j": j, "t": t_str, "matched": is_match, "line": line_numbers["LINE_TRY"], } ) if is_match: value = v[j + w] out[i] = value match_j = j match_w = w steps.append( { "phase": "assign", "i": i, "w": w, "j": j, "t": t_str, "v_index": j + w, "value": value, "line": line_numbers["LINE_ASSIGN"], } ) steps.append( { "phase": "break_inner", "i": i, "w": w, "j": j, "line": line_numbers["LINE_BREAK_INNER"], } ) matched = True break if matched: steps.append( { "phase": "check", "i": i, "w": match_w, "j": match_j, "matched": True, "line": line_numbers["LINE_IF_OUT"], } ) if i == n - 1: steps.append( { "phase": "break_outer", "i": i, "w": match_w, "j": match_j, "line": line_numbers["LINE_BREAK_OUTER"], } ) break if not matched: out[i] = -1 steps.append( { "phase": "output", "i": i, "value": out[i], "matched": matched, } ) if i == n - 1: steps.append({"phase": "return", "line": line_numbers["LINE_RETURN"]}) return steps, out def format_output(out: list[int]) -> str: """格式化输出结果""" if any(value < 0 or value > 1 for value in out): return " ".join(str(value) for value in out) return "".join(str(value) for value in out) def build_payload( q: list[int], k: list[int], v: list[int], line_numbers: Dict[str, int] ) -> Tuple[str, str]: """构建 JSON 负载和格式化输出""" steps, out = rosa_steps(q, k, v, line_numbers) payload = { "q": q, "k": k, "v": v, "steps": steps, "out": out, "run_id": time.time_ns(), } return json.dumps(payload, ensure_ascii=False), format_output(out)