ROSA-Visualizer / algorithm.py
Jellyfish042's picture
黑夜模式支持 (#1)
53fc829 verified
"""
ROSA-QKV 算法实现及步骤追踪
"""
import json
import time
from typing import List, Dict, Any, Tuple
def find_line_number(lines: list[str], needle: str, fallback: int = 1) -> int:
"""在代码行列表中查找包含特定字符串的行号"""
for index, line in enumerate(lines, start=1):
if needle in line:
return index
return fallback
def find_line_after(
lines: list[str], start_line: int, needle: str, fallback: int
) -> int:
"""从指定行开始查找包含特定字符串的行号"""
start_index = min(max(start_line, 0), len(lines))
for index in range(start_index, len(lines)):
if needle in lines[index]:
return index + 1
return fallback
def initialize_line_numbers(code: str) -> Dict[str, int]:
"""初始化代码中各关键行的行号"""
lines = code.splitlines()
return {
"LINE_FOR_I": find_line_number(lines, "for i in range(n):"),
"LINE_FOR_W": find_line_number(lines, "for w in range(i+1,0,-1):"),
"LINE_T_ASSIGN": find_line_number(lines, "t=qqq[i+1-w:i+1]"),
"LINE_FOR_J": find_line_number(lines, "for j in range(i-w,-1,-1):"),
"LINE_TRY": find_line_number(lines, "if kkk[j:j+w]==t:"),
"LINE_ASSIGN": find_line_number(lines, "out[i]=vvv[j+w]"),
"LINE_BREAK_INNER": find_line_after(
lines,
find_line_number(lines, "out[i]=vvv[j+w]"),
"break",
find_line_number(lines, "out[i]=vvv[j+w]"),
),
"LINE_IF_OUT": find_line_number(lines, "if out[i]!=-1:"),
"LINE_BREAK_OUTER": find_line_after(
lines,
find_line_number(lines, "if out[i]!=-1:"),
"break",
find_line_number(lines, "if out[i]!=-1:"),
),
"LINE_RETURN": find_line_number(lines, "return out"),
}
def rosa_steps(
q: list[int], k: list[int], v: list[int], line_numbers: Dict[str, int]
) -> Tuple[List[Dict], List[int]]:
"""
执行 ROSA-QKV 算法并记录每个步骤
返回: (步骤列表, 输出列表)
"""
n = len(q)
out = [-1] * n
steps: list[dict] = []
for i in range(n):
steps.append({"phase": "loop_i", "i": i, "line": line_numbers["LINE_FOR_I"]})
matched = False
match_j = -1
match_w = -1
for w in range(i + 1, 0, -1):
steps.append(
{"phase": "loop_w", "i": i, "w": w, "line": line_numbers["LINE_FOR_W"]}
)
t = q[i + 1 - w : i + 1]
t_str = "".join(str(x) for x in t)
steps.append(
{
"phase": "assign_t",
"i": i,
"w": w,
"t": t_str,
"line": line_numbers["LINE_T_ASSIGN"],
}
)
for j in range(i - w, -1, -1):
steps.append(
{
"phase": "loop_j",
"i": i,
"w": w,
"j": j,
"line": line_numbers["LINE_FOR_J"],
}
)
is_match = k[j : j + w] == t
steps.append(
{
"phase": "try",
"i": i,
"w": w,
"j": j,
"t": t_str,
"matched": is_match,
"line": line_numbers["LINE_TRY"],
}
)
if is_match:
value = v[j + w]
out[i] = value
match_j = j
match_w = w
steps.append(
{
"phase": "assign",
"i": i,
"w": w,
"j": j,
"t": t_str,
"v_index": j + w,
"value": value,
"line": line_numbers["LINE_ASSIGN"],
}
)
steps.append(
{
"phase": "break_inner",
"i": i,
"w": w,
"j": j,
"line": line_numbers["LINE_BREAK_INNER"],
}
)
matched = True
break
if matched:
steps.append(
{
"phase": "check",
"i": i,
"w": match_w,
"j": match_j,
"matched": True,
"line": line_numbers["LINE_IF_OUT"],
}
)
if i == n - 1:
steps.append(
{
"phase": "break_outer",
"i": i,
"w": match_w,
"j": match_j,
"line": line_numbers["LINE_BREAK_OUTER"],
}
)
break
if not matched:
out[i] = -1
steps.append(
{
"phase": "output",
"i": i,
"value": out[i],
"matched": matched,
}
)
if i == n - 1:
steps.append({"phase": "return", "line": line_numbers["LINE_RETURN"]})
return steps, out
def format_output(out: list[int]) -> str:
"""格式化输出结果"""
if any(value < 0 or value > 1 for value in out):
return " ".join(str(value) for value in out)
return "".join(str(value) for value in out)
def build_payload(
q: list[int], k: list[int], v: list[int], line_numbers: Dict[str, int]
) -> Tuple[str, str]:
"""构建 JSON 负载和格式化输出"""
steps, out = rosa_steps(q, k, v, line_numbers)
payload = {
"q": q,
"k": k,
"v": v,
"steps": steps,
"out": out,
"run_id": time.time_ns(),
}
return json.dumps(payload, ensure_ascii=False), format_output(out)