| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | from __future__ import annotations |
| | import numpy as np |
| | from typing import Dict, List, Optional, Tuple, TYPE_CHECKING |
| | from collections import defaultdict, Counter |
| | from dataclasses import dataclass, field |
| |
|
| | if TYPE_CHECKING: |
| | from .haze import Vocab |
| |
|
| |
|
| | @dataclass |
| | class CooccurField: |
| | """ |
| | Co-occurrence field for corpus-biased generation. |
| | |
| | Tracks: |
| | - Bigram counts: P(token_j | token_i) |
| | - Trigram counts: P(token_k | token_i, token_j) |
| | - Co-occurrence within window: which tokens appear near each other |
| | |
| | Uses these statistics to bias logits during generation, |
| | making output more consistent with corpus patterns. |
| | """ |
| | |
| | vocab_size: int |
| | bigram_counts: Dict[int, Counter] = field(default_factory=dict) |
| | trigram_counts: Dict[Tuple[int, int], Counter] = field(default_factory=dict) |
| | cooccur_counts: Dict[int, Counter] = field(default_factory=dict) |
| | token_counts: Counter = field(default_factory=Counter) |
| | total_tokens: int = 0 |
| | window_size: int = 5 |
| | |
| | @classmethod |
| | def from_text( |
| | cls, |
| | text: str, |
| | vocab: "Vocab", |
| | window_size: int = 5, |
| | ) -> "CooccurField": |
| | """ |
| | Build co-occurrence field from corpus text. |
| | |
| | Args: |
| | text: corpus text |
| | vocab: vocabulary for encoding |
| | window_size: context window for co-occurrence |
| | |
| | Returns: |
| | CooccurField with computed statistics |
| | """ |
| | |
| | tokens = vocab.encode(text) |
| | n = len(tokens) |
| | |
| | bigram_counts: Dict[int, Counter] = defaultdict(Counter) |
| | trigram_counts: Dict[Tuple[int, int], Counter] = defaultdict(Counter) |
| | cooccur_counts: Dict[int, Counter] = defaultdict(Counter) |
| | token_counts: Counter = Counter() |
| | |
| | |
| | for t in tokens: |
| | token_counts[t] += 1 |
| | |
| | |
| | for i in range(n - 1): |
| | curr, next_t = tokens[i], tokens[i + 1] |
| | bigram_counts[curr][next_t] += 1 |
| | |
| | |
| | for i in range(n - 2): |
| | prev, curr, next_t = tokens[i], tokens[i + 1], tokens[i + 2] |
| | trigram_counts[(prev, curr)][next_t] += 1 |
| | |
| | |
| | for i in range(n): |
| | center = tokens[i] |
| | |
| | start = max(0, i - window_size) |
| | end = min(n, i + window_size + 1) |
| | for j in range(start, end): |
| | if i != j: |
| | cooccur_counts[center][tokens[j]] += 1 |
| | |
| | return cls( |
| | vocab_size=vocab.vocab_size, |
| | bigram_counts=dict(bigram_counts), |
| | trigram_counts=dict(trigram_counts), |
| | cooccur_counts=dict(cooccur_counts), |
| | token_counts=token_counts, |
| | total_tokens=n, |
| | window_size=window_size, |
| | ) |
| | |
| | def get_bigram_probs(self, current: int) -> np.ndarray: |
| | """ |
| | Get probability distribution for next token given current. |
| | |
| | Returns uniform distribution if current token not seen. |
| | """ |
| | probs = np.zeros(self.vocab_size, dtype=np.float32) |
| | |
| | if current in self.bigram_counts: |
| | counts = self.bigram_counts[current] |
| | total = sum(counts.values()) |
| | for token, count in counts.items(): |
| | if token < self.vocab_size: |
| | probs[token] = count / total |
| | |
| | |
| | if probs.sum() == 0: |
| | probs = np.ones(self.vocab_size, dtype=np.float32) / self.vocab_size |
| | |
| | return probs |
| | |
| | def get_trigram_probs(self, prev: int, current: int) -> np.ndarray: |
| | """ |
| | Get probability distribution for next token given (prev, current). |
| | |
| | Falls back to bigram if trigram not found. |
| | """ |
| | probs = np.zeros(self.vocab_size, dtype=np.float32) |
| | |
| | key = (prev, current) |
| | if key in self.trigram_counts: |
| | counts = self.trigram_counts[key] |
| | total = sum(counts.values()) |
| | for token, count in counts.items(): |
| | if token < self.vocab_size: |
| | probs[token] = count / total |
| | |
| | |
| | if probs.sum() == 0: |
| | return self.get_bigram_probs(current) |
| | |
| | return probs |
| | |
| | def get_cooccur_bias(self, context: List[int]) -> np.ndarray: |
| | """ |
| | Get bias vector based on co-occurrence with recent context. |
| | |
| | Tokens that frequently appear near context tokens get higher bias. |
| | """ |
| | bias = np.zeros(self.vocab_size, dtype=np.float32) |
| | |
| | for ctx_token in context[-self.window_size:]: |
| | if ctx_token in self.cooccur_counts: |
| | counts = self.cooccur_counts[ctx_token] |
| | total = sum(counts.values()) |
| | for token, count in counts.items(): |
| | if token < self.vocab_size: |
| | bias[token] += count / total |
| | |
| | |
| | if bias.sum() > 0: |
| | bias = bias / bias.sum() |
| | else: |
| | bias = np.ones(self.vocab_size, dtype=np.float32) / self.vocab_size |
| | |
| | return bias |
| | |
| | def bias_logits( |
| | self, |
| | logits: np.ndarray, |
| | context: List[int], |
| | alpha: float = 0.3, |
| | mode: str = "trigram", |
| | ) -> np.ndarray: |
| | """ |
| | Bias logits using corpus statistics. |
| | |
| | Args: |
| | logits: raw model logits (vocab_size,) |
| | context: list of recent token indices |
| | alpha: blend factor (0 = pure model, 1 = pure corpus) |
| | mode: "bigram", "trigram", "cooccur", or "blend" |
| | |
| | Returns: |
| | biased logits |
| | """ |
| | if len(context) == 0: |
| | return logits |
| | |
| | |
| | if mode == "bigram": |
| | corpus_probs = self.get_bigram_probs(context[-1]) |
| | elif mode == "trigram" and len(context) >= 2: |
| | corpus_probs = self.get_trigram_probs(context[-2], context[-1]) |
| | elif mode == "cooccur": |
| | corpus_probs = self.get_cooccur_bias(context) |
| | elif mode == "blend": |
| | |
| | if len(context) >= 2: |
| | trigram = self.get_trigram_probs(context[-2], context[-1]) |
| | else: |
| | trigram = self.get_bigram_probs(context[-1]) |
| | cooccur = self.get_cooccur_bias(context) |
| | corpus_probs = 0.6 * trigram + 0.4 * cooccur |
| | else: |
| | corpus_probs = self.get_bigram_probs(context[-1]) |
| | |
| | |
| | corpus_logits = np.log(corpus_probs + 1e-10) |
| | |
| | |
| | biased = (1 - alpha) * logits + alpha * corpus_logits |
| | |
| | return biased |
| | |
| | def sample_from_corpus( |
| | self, |
| | context: List[int], |
| | temperature: float = 1.0, |
| | mode: str = "trigram", |
| | ) -> int: |
| | """ |
| | Sample next token purely from corpus statistics. |
| | |
| | Useful for testing corpus patterns without model. |
| | """ |
| | if mode == "trigram" and len(context) >= 2: |
| | probs = self.get_trigram_probs(context[-2], context[-1]) |
| | elif len(context) >= 1: |
| | probs = self.get_bigram_probs(context[-1]) |
| | else: |
| | |
| | probs = np.zeros(self.vocab_size, dtype=np.float32) |
| | for token, count in self.token_counts.items(): |
| | if token < self.vocab_size: |
| | probs[token] = count |
| | probs = probs / probs.sum() |
| | |
| | |
| | if temperature != 1.0: |
| | probs = np.power(probs, 1.0 / temperature) |
| | probs = probs / probs.sum() |
| | |
| | return int(np.random.choice(self.vocab_size, p=probs)) |
| | |
| | def generate_from_corpus( |
| | self, |
| | seed: List[int], |
| | length: int = 100, |
| | temperature: float = 0.8, |
| | mode: str = "trigram", |
| | ) -> List[int]: |
| | """ |
| | Generate tokens purely from corpus statistics. |
| | |
| | No model needed! Just trigram/bigram chains. |
| | This is how Leo generates - pure field dynamics. |
| | """ |
| | tokens = list(seed) |
| | |
| | for _ in range(length): |
| | next_token = self.sample_from_corpus( |
| | tokens, |
| | temperature=temperature, |
| | mode=mode, |
| | ) |
| | tokens.append(next_token) |
| | |
| | return tokens |
| | |
| | def stats(self) -> Dict: |
| | """Return field statistics.""" |
| | return { |
| | "total_tokens": self.total_tokens, |
| | "unique_tokens": len(self.token_counts), |
| | "bigram_contexts": len(self.bigram_counts), |
| | "trigram_contexts": len(self.trigram_counts), |
| | "cooccur_contexts": len(self.cooccur_counts), |
| | "window_size": self.window_size, |
| | } |
| |
|
| |
|
| | def demo_cooccur(corpus_path: str = "text.txt") -> None: |
| | """ |
| | Demo co-occurrence field generation. |
| | |
| | Shows that you can generate text purely from corpus statistics! |
| | """ |
| | from pathlib import Path |
| | |
| | |
| | try: |
| | from .haze import Vocab |
| | except ImportError: |
| | from haze import Vocab |
| | |
| | corpus_path = Path(corpus_path) |
| | if not corpus_path.exists(): |
| | print(f"[error] {corpus_path} not found") |
| | return |
| | |
| | text = corpus_path.read_text() |
| | vocab = Vocab.from_text(text) |
| | |
| | print("=" * 60) |
| | print(" CO-OCCURRENCE FIELD DEMO") |
| | print("=" * 60) |
| | print(f" corpus: {corpus_path} ({len(text)} chars)") |
| | print(f" vocab: {vocab.vocab_size} unique tokens") |
| | print() |
| | |
| | |
| | field = CooccurField.from_text(text, vocab, window_size=5) |
| | stats = field.stats() |
| | print(f" field stats:") |
| | for k, v in stats.items(): |
| | print(f" {k}: {v}") |
| | print() |
| | |
| | |
| | seeds = ["the haze", "darling", "love"] |
| | |
| | print("=" * 60) |
| | print(" PURE CORPUS GENERATION (no model, just statistics)") |
| | print("=" * 60) |
| | |
| | for seed_text in seeds: |
| | seed_tokens = vocab.encode(seed_text) |
| | |
| | generated = field.generate_from_corpus( |
| | seed_tokens, |
| | length=80, |
| | temperature=0.7, |
| | mode="trigram", |
| | ) |
| | |
| | output = vocab.decode(generated) |
| | print(f"\n>>> \"{seed_text}\"") |
| | print(output) |
| | |
| | print() |
| | print("=" * 60) |
| | print(" this is PURE CORPUS STATISTICS. no neural network.") |
| | print(" like leo's trigram graphs. resonance without weights.") |
| | print("=" * 60) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | demo_cooccur() |
| |
|