from __future__ import annotations import json import os import re import math from dataclasses import dataclass from safetensors.numpy import load_file from typing import Dict, List, Tuple, Optional import numpy as np # type: ignore from pathlib import Path BASE_DIR = Path(__file__).resolve() data = load_file("pipeowl.safetensors") @dataclass class PipeOwlConfig: """ 全域設定。 embeddings_path: 語義場的基底向量矩陣 (V, D) V = 詞彙數 D = 向量維度 delta_scalar_path: 每個 token 對應的一維場偏移量 (V,) 用來做 score 偏移(目前為靜態 bias) vocab_path: vocab list,必須與 embeddings 順序完全對齊。 index i <-> emb[i] <-> delta[i] alpha: base 相似度權重 beta: delta 權重(目前為 logit bias,不是動態 loss) top_k: retrieval 預設回傳數量 temperature: decode 階段採樣溫度 max_new_tokens: decode 最大生成長度 """ ROOT_DIR = BASE_DIR.parent vocab_path: str = str(ROOT_DIR / "tokenizer.json") normalize_rows: bool = False # True: enforce row-wise normalization for cosine==dot ensure_contiguous: bool = True # True: make emb contiguous for faster GEMV max_token_len_cap: int = 32 # cap tokenizer max token length to prevent slow path / garbage vocab #============================= alpha: float = 1 #============================= #============================= # scoring mode #============================= score_mode: str = "residual" # options: # "linear" -> α*base + β*delta + γ*syntax # "residual" -> α*base + (1 - α*base)*delta #============================= ##============================= ## "linear" ## score = α*base + β*delta + γ*syntax ##============================= beta: float = 0.05 ##gamma: float = 1.5 ##============================= ## if linear ## α=0.97 β=0.03 performance well ## α=1 β=0.00 just same as model ##============================= ##============================= ## "residual" ## score = α*base + (1 - α*base)*delta ##============================= ## if residual ## α=1 just same as model ## α=0.9 performance well ## α=0.5 find more meaning ##============================= ##============================= ## retrieval ##============================= top_k: int = 16 ##============================= ##============================= ## decode ##============================= temperature: float = 0.13 ##============================= ## temperature = 0.13 performance well ##============================= max_new_tokens: int = 64 ##============================= """ def softmax(scores, temperature=0.3): scores = np.array(scores) / temperature exp = np.exp(scores - np.max(scores)) return exp / exp.sum() """ def eval_token_nll(engine, text): tokens = engine.tokenizer.tokenize(text) if len(tokens) < 2: return float("inf") total_bits = 0.0 count = 0 for i in range(len(tokens) - 1): context = "".join(tokens[:i+1]) target_token = tokens[i+1] q = engine.encode(context) logits = engine.score_vocab(q) probs = engine.logits_to_probs(logits) idx = engine.token_to_id.get(target_token) p = float(probs[idx]) if idx is not None else 1e-9 p = max(p, 1e-9) total_bits += -math.log2(p) count += 1 return total_bits / count ## semanticizer class VocabTokenizer: """ 字串最大匹配 tokenizer。 設計目標: 將輸入文字拆成 vocab 中存在的 token。 方法: - 使用最大長度優先匹配 適用情境: vocab 是字 / 詞 級別,且已對齊 embedding。 """ def __init__(self, vocab_list, *, max_len_cap: Optional[int] = None): self.vocab_set = set(vocab_list) mx = max(len(t) for t in vocab_list) if max_len_cap is not None: mx = min(mx, int(max_len_cap)) self.max_len = mx def tokenize(self, text): text = text.lower().strip() tokens = [] i = 0 n = len(text) while i < n: matched = False for L in range(self.max_len, 0, -1): if i + L <= n: piece = text[i:i+L] if piece in self.vocab_set: tokens.append(piece) i += L matched = True break if not matched: # 🔥 fallback char(最後才做) tokens.append(text[i]) i += 1 return tokens class PipeOwlEngine: """ PipeOwl 幾何語義引擎核心。 設計哲學: index = 語義場座標 emb[i] -> 詞向量 delta[i] -> 詞的場偏移量 vocab[i] -> 詞本身 核心流程: text ↓ tokenize ↓ mean embedding ↓ score = alpha*base + beta*delta ↓ top-k ↓ decode 這是一個: Field-based retrieval language system """ def __init__(self, cfg: PipeOwlConfig): self.cfg = cfg #self.emb: np.ndarray = None # (V, D) float32 #self.delta: np.ndarray = None # (V,) float32 self.emb = data["embeddings"].astype(np.float32) self.delta = data["delta_field"].astype(np.float32) self.token_to_id: Dict[str, int] = {} self.id_to_token: List[str] = [] # Decoder (optional) self.decoder = MicroGPTDecoder() # inference-only stub; plug your trained weights later self._load_assets() # ------------------------- # asset loading # ------------------------- def _load_assets(self) -> None: """ 載入語義場資產。 載入內容: 1. embeddings (V, D) 2. delta scalar (V,) 3. vocab list (V,) 關鍵假設: 三者必須 index 完全對齊。 幾何意義: 每個 index i 對應語義空間中的一個固定場點。 """ if not os.path.exists(self.cfg.vocab_path): raise FileNotFoundError(self.cfg.vocab_path) emb = self.emb # embeddings: (V, D) if emb.dtype != np.float32: emb = emb.astype(np.float32, copy=False) # ChatGPT note: make C-contiguous for faster GEMV if self.cfg.ensure_contiguous and not emb.flags["C_CONTIGUOUS"]: emb = np.ascontiguousarray(emb) if self.cfg.normalize_rows: norms = np.linalg.norm(emb, axis=1, keepdims=True) + 1e-12 emb = emb / norms # delta: (V,) self.delta = data["delta_field"] if self.delta.dtype != np.float32: self.delta = self.delta.astype(np.float32, copy=False) if self.emb.ndim != 2: raise ValueError(f"embeddings must be 2D (V, D), got shape={self.emb.shape}") # (V, D) V, _ = self.emb.shape if self.delta.ndim != 1 or self.delta.shape[0] != V: raise ValueError(f"delta must be shape (V,), got {self.delta.shape}, expected ({V},)") # vocab json: build token_to_id and id_to_token with open(self.cfg.vocab_path, "r", encoding="utf-8-sig") as f: vocab_list = json.load(f) if not isinstance(vocab_list, list): raise ValueError("vocab must be a list for geometric field mode") if len(vocab_list) != V: raise ValueError(f"vocab size {len(vocab_list)} != embeddings V {V}") self.vocab = vocab_list self.id_to_token = vocab_list self.token_to_id = {t: i for i, t in enumerate(vocab_list)} self.tokenizer = VocabTokenizer(self.vocab) # ------------------------- # encode (from vector library) # ------------------------- def encode(self, text: str): """ 將文字投影到語義場中。 流程: 1. tokenize -> token list 2. 取每個 token 對應 emb 3. 做 mean pooling 4. normalize 數學形式: q = normalize( mean( emb[token_i] ) ) 幾何意義: 這是在語義場中求質心。 風險: - mean pooling 會削弱方向性 """ # ChatGPT note: exact token fast-path (prevents "貓頭鷹 = mean(貓,頭,鷹)" pollution) idx0 = self.token_to_id.get(text) if idx0 is not None: v = self.emb[idx0].astype(np.float32, copy=False) # emb rows already normalized if cfg.normalize_rows=True; keep safe anyway: v = v / (np.linalg.norm(v) + 1e-12) return v tokens = self.tokenizer.tokenize(text) if not tokens: return np.zeros(self.emb.shape[1], dtype=np.float32) vecs = [] wts = [] for t in tokens: idx = self.token_to_id.get(t) if idx is None: continue vecs.append(self.emb[idx]) wts.append(max(1, len(t))) if not vecs: return np.zeros(self.emb.shape[1], dtype=np.float32) vecs = np.stack(vecs, axis=0).astype(np.float32, copy=False) wts = np.asarray(wts, dtype=np.float32) q = np.average(vecs, axis=0, weights=wts) q /= (np.linalg.norm(q) + 1e-12) return q # ------------------------- # probs (decode) # ------------------------- def logits_to_probs(self, logits: np.ndarray, temperature: Optional[float] = None) -> np.ndarray: T = self.cfg.temperature if temperature is None else float(temperature) x = logits.astype(np.float64) / max(T, 1e-8) x = x - np.max(x) exp_x = np.exp(x) return (exp_x / np.sum(exp_x)).astype(np.float32) # ------------------------- # loss / scoring (delta) # ------------------------- def score_vocab(self, q: np.ndarray, alpha: Optional[float] = None, beta: Optional[float] = None) -> np.ndarray: """ 計算每個 vocab token 的場分數。 base: emb @ q 若 emb 與 q 已正規化,則為 cosine similarity。 delta: 每個 token 的靜態場偏移量。 目前語義: delta 是 logit bias。 不是 loss、不是 energy gradient。s """ a = self.cfg.alpha if alpha is None else float(alpha) b = self.cfg.beta if beta is None else float(beta) base = self.emb @ q if self.cfg.score_mode == "linear": score = a * base + b * self.delta elif self.cfg.score_mode == "residual": score = a * base + (1 - a * base) * self.delta else: raise ValueError(f"Unknown score_mode: {self.cfg.score_mode}") return score.astype(np.float32, copy=False) def topk(self, score: np.ndarray, k: Optional[int] = None) -> List[Tuple[str, float]]: """ 取前 k 高分 token。 使用 argpartition 提升效率。 回傳: [(token_string, score), ...] 幾何意義: 找出最接近 query 向量(含場偏移)的場點。 注意: score 可能 > 1(因為加入 delta)。 """ k = self.cfg.top_k if k is None else int(k) k = max(1, min(k, score.shape[0])) # argpartition for speed idx = np.argpartition(-score, k - 1)[:k] idx = idx[np.argsort(-score[idx])] out = [] for i in idx: tok = self.id_to_token[i] if i < len(self.id_to_token) else str(i) out.append((tok, float(score[i]))) return out # ------------------------- # decode (microgpt inference-only) # ------------------------- def decode(self, prompt_tokens: List[str]) -> str: """ Decode 階段。 目前行為: 將 top tokens 拼成 prompt 字串, 丟給 microgpt stub。 設計定位: retrieval 與 generation 分離。 現狀: microgpt 尚未接上真實權重, 目前只是 pipeline 占位。 """ prompt = " ".join([t for t in prompt_tokens if t]) return self.decoder.generate( prompt=prompt, temperature=self.cfg.temperature, max_new_tokens=self.cfg.max_new_tokens, ) # ------------------------- # one-shot pipeline # ------------------------- def pipeowl( self, text: str, *, top_k: Optional[int] = None, alpha: Optional[float] = None, beta: Optional[float] = None, temperature: Optional[float] = None, max_new_tokens: Optional[int] = None, ) -> Dict[str, object]: """ 單次完整 pipeline。 流程: text ↓ encode ↓ score_vocab ↓ topk ↓ decode 回傳: { "query": 原始文字, "retrieved": top-k token + 分數, "prompt": 用於 decode 的 token 串, "decoded": 生成結果 } 這是語義場查詢的一次完整觀測。 """ q = self.encode(text) s = self.score_vocab(q, alpha=alpha, beta=beta) retrieved = self.topk(s, k=top_k) # build a prompt from top tokens (simple & deterministic) prompt_tokens = [t for (t, _) in retrieved[: min(len(retrieved), 8)]] if temperature is not None: self.cfg.temperature = float(temperature) if max_new_tokens is not None: self.cfg.max_new_tokens = int(max_new_tokens) decoded = self.decode(prompt_tokens) return { "query": text, "retrieved": retrieved, "prompt": " ".join(prompt_tokens), "decoded": decoded, } # ---------------------------------------------------------------------- # microgpt inference-only stub # ---------------------------------------------------------------------- class MicroGPTDecoder: """ 推理階段占位 decoder。 設計目的: 讓 pipeline 可運行, 未來可替換為: - 已訓練 microGPT - 外部 LLM - 或場驅動 sampling 模型 現在只是 scaffold。 Inference-only placeholder. Why placeholder? - Your pasted microGPT file trains its own weights in-process. - For a real decode stage, you want: (A) load a trained state_dict from disk, OR (B) keep a tiny trained model in memory, OR (C) use microGPT purely as a sampler over a learned char vocab. This class is the stable interface. Plug your implementation later. """ def __init__(self): # If you already have trained weights, add: # self.state_dict = load(...) pass def generate(self, prompt: str, temperature: float = 0.8, max_new_tokens: int = 64) -> str: # Minimal safe fallback: return prompt as “decoded” scaffold. # Replace this with your microgpt forward+sampling once you have weights. # (This keeps the pipeline callable today.) return f"[microgpt_stub] {prompt}"