| from __future__ import annotations
|
|
|
| import json
|
| import os
|
| import re
|
| from dataclasses import dataclass
|
| from safetensors.numpy import load_file
|
| from typing import Dict, List, Tuple, Optional
|
| import numpy as np
|
| from pathlib import Path
|
|
|
| BASE_DIR = Path(__file__).resolve()
|
| data = load_file("pipeowl_fp16.safetensors")
|
|
|
| @dataclass
|
| class PipeOwlConfig:
|
| """
|
| 全域設定。
|
|
|
| embeddings_path:
|
| 語義場的基底向量矩陣 (V, D)
|
| V = 詞彙數
|
| D = 向量維度
|
|
|
| delta_scalar_path:
|
| 每個 token 對應的一維場偏移量 (V,)
|
| 用來做 score 偏移(目前為靜態 bias)
|
|
|
| vocab_path:
|
| vocab list,必須與 embeddings 順序完全對齊。
|
| index i <-> emb[i] <-> delta[i]
|
|
|
| alpha:
|
| base 相似度權重
|
|
|
| beta:
|
| delta 權重(目前為 logit bias,不是動態 loss)
|
|
|
| top_k:
|
| retrieval 預設回傳數量
|
|
|
| 佔位:
|
| temperature:
|
| decode 階段採樣溫度
|
|
|
| max_new_tokens:
|
| decode 最大生成長度
|
| """
|
| ROOT_DIR = BASE_DIR.parent
|
| vocab_path: str = str(ROOT_DIR / "vocabulary.json")
|
|
|
|
|
| normalize_rows: bool = True
|
| ensure_contiguous: bool = True
|
| max_token_len_cap: int = 32
|
|
|
|
|
| alpha: float = 0.96
|
|
|
| beta: float = 0.04
|
|
|
| """能量式 未開啟
|
| beta: Optional[float] = None
|
| def __post_init__(self):
|
| if self.beta is None:
|
| self.beta = 1.0 - self.alpha
|
| """
|
|
|
|
|
| top_k: int = 16
|
|
|
|
|
| temperature: float = 0.8
|
| max_new_tokens: int = 64
|
|
|
|
|
| class VocabTokenizer:
|
| """
|
| 字串最大匹配 tokenizer。
|
|
|
| 設計目標:
|
| 將輸入文字拆成 vocab 中存在的 token。
|
|
|
| 方法:
|
| - 使用最大長度優先匹配
|
|
|
| 適用情境:
|
| vocab 是字 / 詞 級別,且已對齊 embedding。
|
| """
|
| def __init__(self, vocab_list, *, max_len_cap: Optional[int] = None):
|
| self.vocab_set = set(vocab_list)
|
|
|
| mx = max(len(t) for t in vocab_list)
|
| if max_len_cap is not None:
|
| mx = min(mx, int(max_len_cap))
|
| self.max_len = mx
|
|
|
| """ ## 舊pipeline升級fallback tokenizer
|
| def tokenize(self, text: str):
|
| tokens = []
|
| i = 0
|
| n = len(text)
|
|
|
| while i < n:
|
| matched = False
|
| for L in range(self.max_len, 0, -1):
|
| if i + L <= n:
|
| piece = text[i:i+L]
|
| if piece in self.vocab_set:
|
| tokens.append(piece)
|
| i += L
|
| matched = True
|
| break
|
| if not matched:
|
| i += 1
|
| return tokens
|
| """
|
|
|
| def tokenize(self, text):
|
|
|
| text = text.lower()
|
|
|
| words = re.findall(r"[a-zA-Z]+", text)
|
| if words:
|
| return words
|
|
|
|
|
| tokens = []
|
| i = 0
|
| n = len(text)
|
|
|
| while i < n:
|
|
|
| matched = False
|
|
|
| for L in range(self.max_len, 0, -1):
|
|
|
| if i + L <= n:
|
| piece = text[i:i+L]
|
|
|
| if piece in self.vocab_set:
|
| tokens.append(piece)
|
| i += L
|
| matched = True
|
| break
|
|
|
| if not matched:
|
| i += 1
|
|
|
| return tokens
|
|
|
| class PipeOwlEngine:
|
| """
|
| PipeOwl 幾何語義引擎核心。
|
|
|
| 設計哲學:
|
| index = 語義場座標
|
|
|
| emb[i] -> 詞向量
|
| delta[i] -> 詞的場偏移量
|
| vocab[i] -> 詞本身
|
|
|
| 核心流程:
|
| text
|
| ↓
|
| tokenize
|
| ↓
|
| mean embedding
|
| ↓
|
| score = alpha*base + beta*delta
|
| ↓
|
| top-k
|
| ↓
|
| decode
|
|
|
| 這是一個:
|
| Field-based retrieval language system
|
| """
|
|
|
| def __init__(self, cfg: PipeOwlConfig):
|
| self.cfg = cfg
|
|
|
|
|
|
|
| self.emb = data["embeddings"].astype(np.float32)
|
| self.delta = data["delta_field"].astype(np.float32)
|
| self.token_to_id: Dict[str, int] = {}
|
| self.id_to_token: List[str] = []
|
|
|
|
|
| self.decoder = MicroGPTDecoder()
|
|
|
| self._load_assets()
|
|
|
|
|
|
|
|
|
|
|
| def _load_assets(self) -> None:
|
| """
|
| 載入語義場資產。
|
|
|
| 載入內容:
|
| 1. embeddings (V, D)
|
| 2. delta scalar (V,)
|
| 3. vocab list (V,)
|
|
|
| 關鍵假設:
|
| 三者必須 index 完全對齊。
|
|
|
| 幾何意義:
|
| 每個 index i 對應語義空間中的一個固定場點。
|
|
|
| """
|
| if not os.path.exists(self.cfg.vocab_path):
|
| raise FileNotFoundError(self.cfg.vocab_path)
|
|
|
| emb = self.emb
|
|
|
|
|
|
|
| if emb.dtype != np.float32:
|
| emb = emb.astype(np.float32, copy=False)
|
|
|
|
|
| if self.cfg.ensure_contiguous and not emb.flags["C_CONTIGUOUS"]:
|
| emb = np.ascontiguousarray(emb)
|
|
|
|
|
| if self.cfg.normalize_rows:
|
|
|
| if isinstance(emb, np.memmap):
|
| emb = np.array(emb, copy=True)
|
|
|
|
|
| self.delta = data["delta_field"]
|
| if self.delta.dtype != np.float32:
|
| self.delta = self.delta.astype(np.float32, copy=False)
|
|
|
| if self.emb.ndim != 2:
|
| raise ValueError(f"embeddings must be 2D (V, D), got shape={self.emb.shape}")
|
|
|
|
|
| V, _ = self.emb.shape
|
|
|
| if self.delta.ndim != 1 or self.delta.shape[0] != V:
|
| raise ValueError(f"delta must be shape (V,), got {self.delta.shape}, expected ({V},)")
|
|
|
|
|
| with open(self.cfg.vocab_path, "r", encoding="utf-8-sig") as f:
|
| vocab_list = json.load(f)
|
|
|
| if not isinstance(vocab_list, list):
|
| raise ValueError("vocab must be a list for geometric field mode")
|
|
|
| if len(vocab_list) != V:
|
| raise ValueError(f"vocab size {len(vocab_list)} != embeddings V {V}")
|
|
|
| self.vocab = vocab_list
|
| self.id_to_token = vocab_list
|
| self.token_to_id = {t: i for i, t in enumerate(vocab_list)}
|
|
|
| self.tokenizer = VocabTokenizer(self.vocab)
|
|
|
|
|
|
|
|
|
|
|
| def encode(self, text: str):
|
| """
|
| 將文字投影到語義場中。
|
|
|
| 流程:
|
| 1. tokenize -> token list
|
| 2. 取每個 token 對應 emb
|
| 3. 做 mean pooling
|
| 4. normalize
|
|
|
| 數學形式:
|
| q = normalize( mean( emb[token_i] ) )
|
|
|
| 幾何意義:
|
| 這是在語義場中求質心。
|
|
|
| 風險:
|
| - mean pooling 會削弱方向性
|
| """
|
| """##這是舊代碼 因為想實驗貓頭鷹和鴞之間拉近距離
|
| tokens = self.tokenizer.tokenize(text)
|
|
|
| vecs = []
|
| for t in tokens:
|
| idx = self.token_to_id[t]
|
| vecs.append(self.emb[idx])
|
|
|
| if not vecs:
|
| return np.zeros(self.emb.shape[1], dtype=np.float32)
|
|
|
| q = np.mean(vecs, axis=0)
|
| q /= (np.linalg.norm(q) + 1e-12)
|
| return q
|
| """
|
|
|
| idx0 = self.token_to_id.get(text)
|
| if idx0 is not None:
|
| v = self.emb[idx0].astype(np.float32, copy=False)
|
|
|
| v = v / (np.linalg.norm(v) + 1e-12)
|
| return v
|
|
|
| tokens = self.tokenizer.tokenize(text)
|
| if not tokens:
|
| return np.zeros(self.emb.shape[1], dtype=np.float32)
|
|
|
| vecs = []
|
| wts = []
|
| for t in tokens:
|
| idx = self.token_to_id.get(t)
|
|
|
|
|
|
|
|
|
|
|
| if idx is None:
|
| continue
|
|
|
| vecs.append(self.emb[idx])
|
|
|
| wts.append(max(1, len(t)))
|
|
|
| vecs = np.stack(vecs, axis=0).astype(np.float32, copy=False)
|
| wts = np.asarray(wts, dtype=np.float32)
|
| q = np.average(vecs, axis=0, weights=wts)
|
| q /= (np.linalg.norm(q) + 1e-12)
|
| return q
|
|
|
|
|
|
|
|
|
| def score_vocab(self, q: np.ndarray, alpha: Optional[float] = None, beta: Optional[float] = None) -> np.ndarray:
|
| """
|
| 計算每個 vocab token 的場分數。
|
|
|
| base:
|
| emb @ q
|
| 若 emb 與 q 已正規化,則為 cosine similarity。
|
|
|
| delta:
|
| 每個 token 的靜態場偏移量。
|
|
|
| 目前語義:
|
| delta 是 logit bias。
|
| 不是 loss、不是 energy gradient。
|
|
|
| """
|
| a = self.cfg.alpha if alpha is None else float(alpha)
|
| b = self.cfg.beta if beta is None else float(beta)
|
|
|
| base = self.emb @ q
|
| score = a * base + b * self.delta
|
| return score.astype(np.float32, copy=False)
|
|
|
| def topk(self, score: np.ndarray, k: Optional[int] = None) -> List[Tuple[str, float]]:
|
| """
|
| 取前 k 高分 token。
|
|
|
| 使用 argpartition 提升效率。
|
|
|
| 回傳:
|
| [(token_string, score), ...]
|
|
|
| 幾何意義:
|
| 找出最接近 query 向量(含場偏移)的場點。
|
|
|
| 注意:
|
| score 可能 > 1(因為加入 delta)。
|
| """
|
| k = self.cfg.top_k if k is None else int(k)
|
| k = max(1, min(k, score.shape[0]))
|
|
|
|
|
| idx = np.argpartition(-score, k - 1)[:k]
|
| idx = idx[np.argsort(-score[idx])]
|
|
|
| out = []
|
| for i in idx:
|
| tok = self.id_to_token[i] if i < len(self.id_to_token) else str(i)
|
| out.append((tok, float(score[i])))
|
| return out
|
|
|
|
|
|
|
|
|
| def decode(self, prompt_tokens: List[str]) -> str:
|
| """
|
| Decode 階段。
|
|
|
| 目前行為:
|
| 將 top tokens 拼成 prompt 字串,
|
| 丟給 microgpt stub。
|
|
|
| 設計定位:
|
| retrieval 與 generation 分離。
|
|
|
| 現狀:
|
| microgpt 尚未接上真實權重,
|
| 目前只是 pipeline 占位。
|
| """
|
|
|
| prompt = " ".join([t for t in prompt_tokens if t])
|
| return self.decoder.generate(
|
| prompt=prompt,
|
| temperature=self.cfg.temperature,
|
| max_new_tokens=self.cfg.max_new_tokens,
|
| )
|
|
|
|
|
|
|
|
|
| def pipeowl(
|
| self,
|
| text: str,
|
| *,
|
| top_k: Optional[int] = None,
|
| alpha: Optional[float] = None,
|
| beta: Optional[float] = None,
|
| temperature: Optional[float] = None,
|
| max_new_tokens: Optional[int] = None,
|
| ) -> Dict[str, object]:
|
| """
|
| 單次完整 pipeline。
|
|
|
| 流程:
|
| text
|
| ↓
|
| encode
|
| ↓
|
| score_vocab
|
| ↓
|
| topk
|
| ↓
|
| decode
|
|
|
| 回傳:
|
| {
|
| "query": 原始文字,
|
| "retrieved": top-k token + 分數,
|
| "prompt": 用於 decode 的 token 串,
|
| "decoded": 生成結果
|
| }
|
|
|
| 這是語義場查詢的一次完整觀測。
|
| """
|
| q = self.encode(text)
|
| s = self.score_vocab(q, alpha=alpha, beta=beta)
|
| retrieved = self.topk(s, k=top_k)
|
|
|
|
|
| prompt_tokens = [t for (t, _) in retrieved[: min(len(retrieved), 8)]]
|
| if temperature is not None:
|
| self.cfg.temperature = float(temperature)
|
| if max_new_tokens is not None:
|
| self.cfg.max_new_tokens = int(max_new_tokens)
|
|
|
| decoded = self.decode(prompt_tokens)
|
| return {
|
| "query": text,
|
| "retrieved": retrieved,
|
| "prompt": " ".join(prompt_tokens),
|
| "decoded": decoded,
|
| }
|
|
|
|
|
|
|
|
|
|
|
| class MicroGPTDecoder:
|
| """
|
| 推理階段占位 decoder。
|
|
|
| 設計目的:
|
| 讓 pipeline 可運行,
|
| 未來可替換為:
|
| - 已訓練 microGPT
|
| - 外部 LLM
|
| - 或場驅動 sampling 模型
|
|
|
| 現在只是 scaffold。
|
|
|
| Inference-only placeholder.
|
|
|
| Why placeholder?
|
| - Your pasted microGPT file trains its own weights in-process.
|
| - For a real decode stage, you want:
|
| (A) load a trained state_dict from disk, OR
|
| (B) keep a tiny trained model in memory, OR
|
| (C) use microGPT purely as a sampler over a learned char vocab.
|
|
|
| This class is the stable interface. Plug your implementation later.
|
| """
|
|
|
| def __init__(self):
|
|
|
|
|
| pass
|
|
|
| def generate(self, prompt: str, temperature: float = 0.8, max_new_tokens: int = 64) -> str:
|
|
|
|
|
|
|
| return f"[microgpt_stub] {prompt}"
|
|
|