from __future__ import annotations import json import os import re from dataclasses import dataclass from safetensors.numpy import load_file from typing import Dict, List, Tuple, Optional import numpy as np # type: ignore from pathlib import Path BASE_DIR = Path(__file__).resolve() data = load_file("pipeowl.safetensors") @dataclass class PipeOwlConfig: """ 全域設定。 embeddings_path: 語義場的基底向量矩陣 (V, D) V = 詞彙數 D = 向量維度 delta_scalar_path: 每個 token 對應的一維場偏移量 (V,) 用來做 score 偏移(目前為靜態 bias) vocab_path: vocab list,必須與 embeddings 順序完全對齊。 index i <-> emb[i] <-> delta[i] alpha: base 相似度權重 beta: delta 權重(目前為 logit bias,不是動態 loss) top_k: retrieval 預設回傳數量 佔位: temperature: decode 階段採樣溫度 max_new_tokens: decode 最大生成長度 """ ROOT_DIR = BASE_DIR.parent safetensors_path: str = str(ROOT_DIR / "pipeowl.safetensors") vocab_path: str = str(ROOT_DIR / "vocabulary.json") # ChatGPT note: perf toggles normalize_rows: bool = True # True: enforce row-wise normalization for cosine==dot ensure_contiguous: bool = True # True: make emb contiguous for faster GEMV max_token_len_cap: int = 32 # cap tokenizer max token length to prevent slow path / garbage vocab # scoring: score = alpha * base + beta * delta_field alpha: float = 1.0 ## 定值 beta: float = 0.05 """能量式 未開啟 beta: Optional[float] = None def __post_init__(self): if self.beta is None: self.beta = 1.0 - self.alpha """ # retrieval top_k: int = 16 # decode temperature: float = 0.8 max_new_tokens: int = 64 ## semanticizer class VocabTokenizer: """ 字串最大匹配 tokenizer。 設計目標: 將輸入文字拆成 vocab 中存在的 token。 方法: - 使用最大長度優先匹配 適用情境: vocab 是字 / 詞 級別,且已對齊 embedding。 """ def __init__(self, vocab_list, *, max_len_cap: Optional[int] = None): self.vocab_set = set(vocab_list) mx = max(len(t) for t in vocab_list) if max_len_cap is not None: mx = min(mx, int(max_len_cap)) self.max_len = mx """ ## 舊pipeline升級fallback tokenizer def tokenize(self, text: str): tokens = [] i = 0 n = len(text) while i < n: matched = False for L in range(self.max_len, 0, -1): if i + L <= n: piece = text[i:i+L] if piece in self.vocab_set: tokens.append(piece) i += L matched = True break if not matched: i += 1 return tokens """ def tokenize(self, text): text = text.lower() words = re.findall(r"[a-zA-Z]+", text) if words: return words # 先嘗試 vocab 最大匹配 tokens = [] i = 0 n = len(text) while i < n: matched = False for L in range(self.max_len, 0, -1): if i + L <= n: piece = text[i:i+L] if piece in self.vocab_set: tokens.append(piece) i += L matched = True break if not matched: i += 1 return tokens class PipeOwlEngine: """ PipeOwl 幾何語義引擎核心。 設計哲學: index = 語義場座標 emb[i] -> 詞向量 delta[i] -> 詞的場偏移量 vocab[i] -> 詞本身 核心流程: text ↓ tokenize ↓ mean embedding ↓ score = alpha*base + beta*delta ↓ top-k ↓ decode 這是一個: Field-based retrieval language system """ def __init__(self, cfg: PipeOwlConfig): self.cfg = cfg #self.emb: np.ndarray = None # (V, D) float32 #self.delta: np.ndarray = None # (V,) float32 self.emb = data["embeddings"] self.delta = data["delta_field"] self.token_to_id: Dict[str, int] = {} self.id_to_token: List[str] = [] # Decoder (optional) self.decoder = MicroGPTDecoder() # inference-only stub; plug your trained weights later self._load_assets() # ------------------------- # asset loading # ------------------------- def _load_assets(self) -> None: """ 載入語義場資產。 載入內容: 1. embeddings (V, D) 2. delta scalar (V,) 3. vocab list (V,) 關鍵假設: 三者必須 index 完全對齊。 幾何意義: 每個 index i 對應語義空間中的一個固定場點。 """ if not os.path.exists(self.cfg.vocab_path): raise FileNotFoundError(self.cfg.vocab_path) emb = self.emb # embeddings: (V, D) if emb.dtype != np.float32: emb = emb.astype(np.float32, copy=False) # ChatGPT note: make C-contiguous for faster GEMV if self.cfg.ensure_contiguous and not emb.flags["C_CONTIGUOUS"]: emb = np.ascontiguousarray(emb) # ChatGPT note: normalize rows once so cosine == dot (avoid per-query normalization cost) if self.cfg.normalize_rows: # If it's memmap and you want to keep it read-only, we need a normal ndarray copy anyway. if isinstance(emb, np.memmap): emb = np.array(emb, copy=True) # delta: (V,) self.delta = data["delta_field"] if self.delta.dtype != np.float32: self.delta = self.delta.astype(np.float32, copy=False) if self.emb.ndim != 2: raise ValueError(f"embeddings must be 2D (V, D), got shape={self.emb.shape}") # (V, D) V, _ = self.emb.shape if self.delta.ndim != 1 or self.delta.shape[0] != V: raise ValueError(f"delta must be shape (V,), got {self.delta.shape}, expected ({V},)") # vocab json: build token_to_id and id_to_token with open(self.cfg.vocab_path, "r", encoding="utf-8-sig") as f: vocab_list = json.load(f) if not isinstance(vocab_list, list): raise ValueError("vocab must be a list for geometric field mode") if len(vocab_list) != V: raise ValueError(f"vocab size {len(vocab_list)} != embeddings V {V}") self.vocab = vocab_list self.id_to_token = vocab_list self.token_to_id = {t: i for i, t in enumerate(vocab_list)} self.tokenizer = VocabTokenizer(self.vocab) # ------------------------- # encode (from vector library) # ------------------------- def encode(self, text: str): """ 將文字投影到語義場中。 流程: 1. tokenize -> token list 2. 取每個 token 對應 emb 3. 做 mean pooling 4. normalize 數學形式: q = normalize( mean( emb[token_i] ) ) 幾何意義: 這是在語義場中求質心。 風險: - mean pooling 會削弱方向性 """ """##這是舊代碼 因為想實驗貓頭鷹和鴞之間拉近距離 tokens = self.tokenizer.tokenize(text) vecs = [] for t in tokens: idx = self.token_to_id[t] vecs.append(self.emb[idx]) if not vecs: return np.zeros(self.emb.shape[1], dtype=np.float32) q = np.mean(vecs, axis=0) q /= (np.linalg.norm(q) + 1e-12) return q """ # ChatGPT note: exact token fast-path (prevents "貓頭鷹 = mean(貓,頭,鷹)" pollution) idx0 = self.token_to_id.get(text) if idx0 is not None: v = self.emb[idx0].astype(np.float32, copy=False) # emb rows already normalized if cfg.normalize_rows=True; keep safe anyway: v = v / (np.linalg.norm(v) + 1e-12) return v tokens = self.tokenizer.tokenize(text) if not tokens: return np.zeros(self.emb.shape[1], dtype=np.float32) vecs = [] wts = [] for t in tokens: idx = self.token_to_id.get(t) ## 原本 ## idx = self.token_to_id[t] ## safety test if idx is None: continue vecs.append(self.emb[idx]) # length weight: longer token contributes more semantics wts.append(max(1, len(t))) vecs = np.stack(vecs, axis=0).astype(np.float32, copy=False) wts = np.asarray(wts, dtype=np.float32) q = np.average(vecs, axis=0, weights=wts) q /= (np.linalg.norm(q) + 1e-12) return q # ------------------------- # loss / scoring (delta) # ------------------------- def score_vocab(self, q: np.ndarray, alpha: Optional[float] = None, beta: Optional[float] = None) -> np.ndarray: """ 計算每個 vocab token 的場分數。 base: emb @ q 若 emb 與 q 已正規化,則為 cosine similarity。 delta: 每個 token 的靜態場偏移量。 目前語義: delta 是 logit bias。 不是 loss、不是 energy gradient。 """ a = self.cfg.alpha if alpha is None else float(alpha) b = self.cfg.beta if beta is None else float(beta) base = self.emb @ q # (V,) score = a * base + b * self.delta return score.astype(np.float32, copy=False) def topk(self, score: np.ndarray, k: Optional[int] = None) -> List[Tuple[str, float]]: """ 取前 k 高分 token。 使用 argpartition 提升效率。 回傳: [(token_string, score), ...] 幾何意義: 找出最接近 query 向量(含場偏移)的場點。 注意: score 可能 > 1(因為加入 delta)。 """ k = self.cfg.top_k if k is None else int(k) k = max(1, min(k, score.shape[0])) # argpartition for speed idx = np.argpartition(-score, k - 1)[:k] idx = idx[np.argsort(-score[idx])] out = [] for i in idx: tok = self.id_to_token[i] if i < len(self.id_to_token) else str(i) out.append((tok, float(score[i]))) return out # ------------------------- # decode (microgpt inference-only) # ------------------------- def decode(self, prompt_tokens: List[str]) -> str: """ Decode 階段。 目前行為: 將 top tokens 拼成 prompt 字串, 丟給 microgpt stub。 設計定位: retrieval 與 generation 分離。 現狀: microgpt 尚未接上真實權重, 目前只是 pipeline 占位。 """ prompt = " ".join([t for t in prompt_tokens if t]) return self.decoder.generate( prompt=prompt, temperature=self.cfg.temperature, max_new_tokens=self.cfg.max_new_tokens, ) # ------------------------- # one-shot pipeline # ------------------------- def pipeowl( self, text: str, *, top_k: Optional[int] = None, alpha: Optional[float] = None, beta: Optional[float] = None, temperature: Optional[float] = None, max_new_tokens: Optional[int] = None, ) -> Dict[str, object]: """ 單次完整 pipeline。 流程: text ↓ encode ↓ score_vocab ↓ topk ↓ decode 回傳: { "query": 原始文字, "retrieved": top-k token + 分數, "prompt": 用於 decode 的 token 串, "decoded": 生成結果 } 這是語義場查詢的一次完整觀測。 """ q = self.encode(text) s = self.score_vocab(q, alpha=alpha, beta=beta) retrieved = self.topk(s, k=top_k) # build a prompt from top tokens (simple & deterministic) prompt_tokens = [t for (t, _) in retrieved[: min(len(retrieved), 8)]] if temperature is not None: self.cfg.temperature = float(temperature) if max_new_tokens is not None: self.cfg.max_new_tokens = int(max_new_tokens) decoded = self.decode(prompt_tokens) return { "query": text, "retrieved": retrieved, "prompt": " ".join(prompt_tokens), "decoded": decoded, } # ---------------------------------------------------------------------- # microgpt inference-only stub # ---------------------------------------------------------------------- class MicroGPTDecoder: """ 推理階段占位 decoder。 設計目的: 讓 pipeline 可運行, 未來可替換為: - 已訓練 microGPT - 外部 LLM - 或場驅動 sampling 模型 現在只是 scaffold。 Inference-only placeholder. Why placeholder? - Your pasted microGPT file trains its own weights in-process. - For a real decode stage, you want: (A) load a trained state_dict from disk, OR (B) keep a tiny trained model in memory, OR (C) use microGPT purely as a sampler over a learned char vocab. This class is the stable interface. Plug your implementation later. """ def __init__(self): # If you already have trained weights, add: # self.state_dict = load(...) pass def generate(self, prompt: str, temperature: float = 0.8, max_new_tokens: int = 64) -> str: # Minimal safe fallback: return prompt as “decoded” scaffold. # Replace this with your microgpt forward+sampling once you have weights. # (This keeps the pipeline callable today.) return f"[microgpt_stub] {prompt}"