PipeOwl-1.2 / engine.py
WangKaiLin's picture
Upload 6 files
8e1d560 verified
from __future__ import annotations
import json
import os
import re
from dataclasses import dataclass
from safetensors.numpy import load_file
from typing import Dict, List, Tuple, Optional
import numpy as np # type: ignore
from pathlib import Path
BASE_DIR = Path(__file__).resolve()
data = load_file("pipeowl.safetensors")
@dataclass
class PipeOwlConfig:
"""
全域設定。
embeddings_path:
語義場的基底向量矩陣 (V, D)
V = 詞彙數
D = 向量維度
delta_scalar_path:
每個 token 對應的一維場偏移量 (V,)
用來做 score 偏移(目前為靜態 bias)
vocab_path:
vocab list,必須與 embeddings 順序完全對齊。
index i <-> emb[i] <-> delta[i]
alpha:
base 相似度權重
beta:
delta 權重(目前為 logit bias,不是動態 loss)
top_k:
retrieval 預設回傳數量
佔位:
temperature:
decode 階段採樣溫度
max_new_tokens:
decode 最大生成長度
"""
ROOT_DIR = BASE_DIR.parent
safetensors_path: str = str(ROOT_DIR / "pipeowl.safetensors")
vocab_path: str = str(ROOT_DIR / "vocabulary.json")
# ChatGPT note: perf toggles
normalize_rows: bool = True # True: enforce row-wise L2 normalization for cosine==dot
ensure_contiguous: bool = True # True: make emb contiguous for faster GEMV
max_token_len_cap: int = 32 # cap tokenizer max token length to prevent slow path / garbage vocab
# scoring: score = alpha * base + beta * delta_field
alpha: float = 1.0
## 定值
beta: float = 0.05
"""能量式 未開啟
beta: Optional[float] = None
def __post_init__(self):
if self.beta is None:
self.beta = 1.0 - self.alpha
"""
# retrieval
top_k: int = 16
# decode
temperature: float = 0.8
max_new_tokens: int = 64
## semanticizer
class VocabTokenizer:
"""
字串最大匹配 tokenizer。
設計目標:
將輸入文字拆成 vocab 中存在的 token。
方法:
- 使用最大長度優先匹配
適用情境:
vocab 是字 / 詞 級別,且已對齊 embedding。
"""
def __init__(self, vocab_list, *, max_len_cap: Optional[int] = None):
self.vocab_set = set(vocab_list)
mx = max(len(t) for t in vocab_list)
if max_len_cap is not None:
mx = min(mx, int(max_len_cap))
self.max_len = mx
""" ## 舊pipeline升級fallback tokenizer
def tokenize(self, text: str):
tokens = []
i = 0
n = len(text)
while i < n:
matched = False
for L in range(self.max_len, 0, -1):
if i + L <= n:
piece = text[i:i+L]
if piece in self.vocab_set:
tokens.append(piece)
i += L
matched = True
break
if not matched:
i += 1
return tokens
"""
def tokenize(self, text):
text = text.lower()
words = re.findall(r"[a-zA-Z]+", text)
if words:
return words
# 先嘗試 vocab 最大匹配
tokens = []
i = 0
n = len(text)
while i < n:
matched = False
for L in range(self.max_len, 0, -1):
if i + L <= n:
piece = text[i:i+L]
if piece in self.vocab_set:
tokens.append(piece)
i += L
matched = True
break
if not matched:
i += 1
return tokens
class PipeOwlEngine:
"""
PipeOwl 幾何語義引擎核心。
設計哲學:
index = 語義場座標
emb[i] -> 詞向量
delta[i] -> 詞的場偏移量
vocab[i] -> 詞本身
核心流程:
text
tokenize
mean embedding
score = alpha*base + beta*delta
top-k
decode
這是一個:
Field-based retrieval language system
"""
def __init__(self, cfg: PipeOwlConfig):
self.cfg = cfg
#self.emb: np.ndarray = None # (V, D) float32
#self.delta: np.ndarray = None # (V,) float32
self.emb = data["embeddings"]
self.delta = data["delta_field"]
self.token_to_id: Dict[str, int] = {}
self.id_to_token: List[str] = []
# Decoder (optional)
self.decoder = MicroGPTDecoder() # inference-only stub; plug your trained weights later
self._load_assets()
# -------------------------
# asset loading
# -------------------------
def _load_assets(self) -> None:
"""
載入語義場資產。
載入內容:
1. embeddings (V, D)
2. delta scalar (V,)
3. vocab list (V,)
關鍵假設:
三者必須 index 完全對齊。
幾何意義:
每個 index i 對應語義空間中的一個固定場點。
"""
if not os.path.exists(self.cfg.vocab_path):
raise FileNotFoundError(self.cfg.vocab_path)
emb = self.emb
# embeddings: (V, D)
if emb.dtype != np.float32:
emb = emb.astype(np.float32, copy=False)
# ChatGPT note: make C-contiguous for faster GEMV
if self.cfg.ensure_contiguous and not emb.flags["C_CONTIGUOUS"]:
emb = np.ascontiguousarray(emb)
# ChatGPT note: normalize rows once so cosine == dot (avoid per-query normalization cost)
if self.cfg.normalize_rows:
# If it's memmap and you want to keep it read-only, we need a normal ndarray copy anyway.
if isinstance(emb, np.memmap):
emb = np.array(emb, copy=True)
# delta: (V,)
self.delta = data["delta_field"]
if self.delta.dtype != np.float32:
self.delta = self.delta.astype(np.float32, copy=False)
if self.emb.ndim != 2:
raise ValueError(f"embeddings must be 2D (V, D), got shape={self.emb.shape}")
# (V, D)
V, _ = self.emb.shape
if self.delta.ndim != 1 or self.delta.shape[0] != V:
raise ValueError(f"delta must be shape (V,), got {self.delta.shape}, expected ({V},)")
# vocab json: build token_to_id and id_to_token
with open(self.cfg.vocab_path, "r", encoding="utf-8-sig") as f:
vocab_list = json.load(f)
if not isinstance(vocab_list, list):
raise ValueError("vocab must be a list for geometric field mode")
if len(vocab_list) != V:
raise ValueError(f"vocab size {len(vocab_list)} != embeddings V {V}")
self.vocab = vocab_list
self.id_to_token = vocab_list
self.token_to_id = {t: i for i, t in enumerate(vocab_list)}
self.tokenizer = VocabTokenizer(self.vocab)
# -------------------------
# encode (from vector library)
# -------------------------
def encode(self, text: str):
"""
將文字投影到語義場中。
流程:
1. tokenize -> token list
2. 取每個 token 對應 emb
3. 做 mean pooling
4. normalize
數學形式:
q = normalize( mean( emb[token_i] ) )
幾何意義:
這是在語義場中求質心。
風險:
- mean pooling 會削弱方向性
"""
"""##這是舊代碼 因為想實驗貓頭鷹和鴞之間拉近距離
tokens = self.tokenizer.tokenize(text)
vecs = []
for t in tokens:
idx = self.token_to_id[t]
vecs.append(self.emb[idx])
if not vecs:
return np.zeros(self.emb.shape[1], dtype=np.float32)
q = np.mean(vecs, axis=0)
q /= (np.linalg.norm(q) + 1e-12)
return q
"""
# ChatGPT note: exact token fast-path (prevents "貓頭鷹 = mean(貓,頭,鷹)" pollution)
idx0 = self.token_to_id.get(text)
if idx0 is not None:
v = self.emb[idx0].astype(np.float32, copy=False)
# emb rows already normalized if cfg.normalize_rows=True; keep safe anyway:
v = v / (np.linalg.norm(v) + 1e-12)
return v
tokens = self.tokenizer.tokenize(text)
if not tokens:
return np.zeros(self.emb.shape[1], dtype=np.float32)
vecs = []
wts = []
for t in tokens:
idx = self.token_to_id.get(t)
## 原本
## idx = self.token_to_id[t]
## safety test
if idx is None:
continue
vecs.append(self.emb[idx])
# length weight: longer token contributes more semantics
wts.append(max(1, len(t)))
vecs = np.stack(vecs, axis=0).astype(np.float32, copy=False)
wts = np.asarray(wts, dtype=np.float32)
q = np.average(vecs, axis=0, weights=wts)
q /= (np.linalg.norm(q) + 1e-12)
return q
# -------------------------
# loss / scoring (delta)
# -------------------------
def score_vocab(self, q: np.ndarray, alpha: Optional[float] = None, beta: Optional[float] = None) -> np.ndarray:
"""
計算每個 vocab token 的場分數。
base:
emb @ q
若 emb 與 q 已正規化,則為 cosine similarity。
delta:
每個 token 的靜態場偏移量。
目前語義:
delta 是 logit bias。
不是 loss、不是 energy gradient。
"""
a = self.cfg.alpha if alpha is None else float(alpha)
b = self.cfg.beta if beta is None else float(beta)
base = self.emb @ q # (V,)
score = a * base + b * self.delta
return score.astype(np.float32, copy=False)
def topk(self, score: np.ndarray, k: Optional[int] = None) -> List[Tuple[str, float]]:
"""
取前 k 高分 token。
使用 argpartition 提升效率。
回傳:
[(token_string, score), ...]
幾何意義:
找出最接近 query 向量(含場偏移)的場點。
注意:
score 可能 > 1(因為加入 delta)。
"""
k = self.cfg.top_k if k is None else int(k)
k = max(1, min(k, score.shape[0]))
# argpartition for speed
idx = np.argpartition(-score, k - 1)[:k]
idx = idx[np.argsort(-score[idx])]
out = []
for i in idx:
tok = self.id_to_token[i] if i < len(self.id_to_token) else str(i)
out.append((tok, float(score[i])))
return out
# -------------------------
# decode (microgpt inference-only)
# -------------------------
def decode(self, prompt_tokens: List[str]) -> str:
"""
Decode 階段。
目前行為:
將 top tokens 拼成 prompt 字串,
丟給 microgpt stub。
設計定位:
retrieval 與 generation 分離。
現狀:
microgpt 尚未接上真實權重,
目前只是 pipeline 占位。
"""
prompt = " ".join([t for t in prompt_tokens if t])
return self.decoder.generate(
prompt=prompt,
temperature=self.cfg.temperature,
max_new_tokens=self.cfg.max_new_tokens,
)
# -------------------------
# one-shot pipeline
# -------------------------
def pipeowl(
self,
text: str,
*,
top_k: Optional[int] = None,
alpha: Optional[float] = None,
beta: Optional[float] = None,
temperature: Optional[float] = None,
max_new_tokens: Optional[int] = None,
) -> Dict[str, object]:
"""
單次完整 pipeline。
流程:
text
encode
score_vocab
topk
decode
回傳:
{
"query": 原始文字,
"retrieved": top-k token + 分數,
"prompt": 用於 decode 的 token 串,
"decoded": 生成結果
}
這是語義場查詢的一次完整觀測。
"""
q = self.encode(text)
s = self.score_vocab(q, alpha=alpha, beta=beta)
retrieved = self.topk(s, k=top_k)
# build a prompt from top tokens (simple & deterministic)
prompt_tokens = [t for (t, _) in retrieved[: min(len(retrieved), 8)]]
if temperature is not None:
self.cfg.temperature = float(temperature)
if max_new_tokens is not None:
self.cfg.max_new_tokens = int(max_new_tokens)
decoded = self.decode(prompt_tokens)
return {
"query": text,
"retrieved": retrieved,
"prompt": " ".join(prompt_tokens),
"decoded": decoded,
}
# ----------------------------------------------------------------------
# microgpt inference-only stub
# ----------------------------------------------------------------------
class MicroGPTDecoder:
"""
推理階段占位 decoder。
設計目的:
讓 pipeline 可運行,
未來可替換為:
- 已訓練 microGPT
- 外部 LLM
- 或場驅動 sampling 模型
現在只是 scaffold。
Inference-only placeholder.
Why placeholder?
- Your pasted microGPT file trains its own weights in-process.
- For a real decode stage, you want:
(A) load a trained state_dict from disk, OR
(B) keep a tiny trained model in memory, OR
(C) use microGPT purely as a sampler over a learned char vocab.
This class is the stable interface. Plug your implementation later.
"""
def __init__(self):
# If you already have trained weights, add:
# self.state_dict = load(...)
pass
def generate(self, prompt: str, temperature: float = 0.8, max_new_tokens: int = 64) -> str:
# Minimal safe fallback: return prompt as “decoded” scaffold.
# Replace this with your microgpt forward+sampling once you have weights.
# (This keeps the pipeline callable today.)
return f"[microgpt_stub] {prompt}"