AetheronAI / model_numpy.py
AICreator1's picture
Upload model_numpy.py
d940e67 verified
"""
AetheronAI — Retrieval + Markov model (numpy only, no torch)
"""
import json
import random
import re
import numpy as np
from pathlib import Path
from collections import Counter
class AetheronLite:
def __init__(self, n=3, vocab_size=8000):
self.n = n
self.vocab_size = vocab_size
self.vocab = {}
self.inv_vocab = {}
self.ngrams = {}
self.unigrams = Counter()
self.sentences = []
self.trained = False
# ── Vocab ─────────────────────────────────
def build_vocab(self, texts):
freq = Counter()
for text in texts:
for w in text.lower().split():
w = re.sub(r'[^\w]', '', w)
if w: freq[w] += 1
special = ["<pad>", "<unk>", "<bos>", "<eos>"]
self.vocab = {w: i for i, w in enumerate(special)}
for w, _ in freq.most_common(self.vocab_size - len(special)):
self.vocab[w] = len(self.vocab)
self.inv_vocab = {v: k for k, v in self.vocab.items()}
def _clean(self, w):
return re.sub(r'[^\w]', '', w.lower())
def tok(self, text):
ids = [self.vocab.get("<bos>", 0)]
for w in text.split():
w = self._clean(w)
if w:
ids.append(self.vocab.get(w, self.vocab.get("<unk>", 1)))
ids.append(self.vocab.get("<eos>", 2))
return ids
def detok(self, ids):
skip = {self.vocab.get(s, -1) for s in ["<bos>", "<pad>"]}
eos = self.vocab.get("<eos>", 2)
out = []
for i in ids:
if i == eos: break
if i not in skip:
out.append(self.inv_vocab.get(i, ""))
return " ".join(w for w in out if w)
# ── Train ─────────────────────────────────
def train(self, texts):
print("[Model] Строю словарь...")
self.build_vocab(texts)
# Собираем все предложения
self.sentences = []
for text in texts:
# Разбиваем на предложения
for sent in re.split(r'(?<=[.!?])\s+', text):
sent = sent.strip()
words = sent.split()
if 5 <= len(words) <= 60:
self.sentences.append(sent)
random.shuffle(self.sentences)
print(f"[Model] Предложений: {len(self.sentences):,}")
# N-gram
for text in texts:
ids = self.tok(text)
self.unigrams.update(ids)
for i in range(len(ids) - self.n + 1):
ctx = tuple(ids[i:i + self.n - 1])
nxt = ids[i + self.n - 1]
if ctx not in self.ngrams:
self.ngrams[ctx] = Counter()
self.ngrams[ctx][nxt] += 1
self.trained = True
print(f"[Model] Готово: {len(self.vocab):,} слов, {len(self.ngrams):,} n-gram")
# ── Retrieval ─────────────────────────────
def find_relevant(self, query, top_n=5):
"""TF-подобный поиск по предложениям"""
if not self.sentences:
return []
q_words = set(self._clean(w) for w in query.split() if len(w) > 2)
if not q_words:
return random.sample(self.sentences, min(top_n, len(self.sentences)))
scored = []
for s in self.sentences:
s_words = set(self._clean(w) for w in s.split())
score = len(q_words & s_words)
if score > 0:
scored.append((score, s))
if not scored:
return random.sample(self.sentences, min(top_n, len(self.sentences)))
scored.sort(key=lambda x: -x[0])
return [s for _, s in scored[:top_n]]
# ── Generate ──────────────────────────────
def generate(self, prompt="", max_tokens=50, temperature=0.8, top_k=20):
if not self.trained:
return "Модель не обучена. Нажмите Обучение → Запустить."
# Находим релевантные предложения
relevant = self.find_relevant(prompt, top_n=3)
# Берём лучшее предложение как базу
base = relevant[0] if relevant else random.choice(self.sentences)
tokens = self.tok(base)
# Продолжаем через n-gram
eos = self.vocab.get("<eos>", 2)
for _ in range(max_tokens):
counts = None
for k in range(self.n - 1, 0, -1):
ctx = tuple(tokens[-k:])
if ctx in self.ngrams:
counts = self.ngrams[ctx]
break
if counts is None:
break
items = counts.most_common(top_k)
if not items:
break
words_arr = np.array([w for w, _ in items])
logits = np.array([float(c) for _, c in items])
logits = np.log(logits + 1e-8) / max(temperature, 1e-8)
logits -= logits.max()
probs = np.exp(logits)
probs /= probs.sum()
next_tok = int(np.random.choice(words_arr, p=probs))
if next_tok == eos:
break
tokens.append(next_tok)
result = self.detok(tokens)
# Гарантируем непустой ответ
if not result or len(result.split()) < 3:
result = ". ".join(relevant[:2]) if len(relevant) >= 2 else base
return result
def num_parameters(self):
return sum(len(v) for v in self.ngrams.values())
# ── Save / Load ───────────────────────────
def save(self, path=None):
path = path or "models/checkpoints"
Path(path).mkdir(parents=True, exist_ok=True)
data = {
"n": self.n,
"trained": self.trained,
"vocab": self.vocab,
"inv_vocab": {str(k): v for k, v in self.inv_vocab.items()},
"unigrams": {str(k): v for k, v in self.unigrams.items()},
"ngrams": {json.dumps(list(k)): dict(v) for k, v in self.ngrams.items()},
"sentences": self.sentences[:8000],
}
fpath = Path(path) / "aetheron_lite.json"
with open(fpath, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False)
print(f"[Model] Сохранено: {fpath}")
@classmethod
def load(cls, path="models/checkpoints"):
fpath = Path(path) / "aetheron_lite.json"
if not fpath.exists():
return None
with open(fpath, encoding="utf-8") as f:
data = json.load(f)
m = cls(n=data["n"])
m.vocab = data["vocab"]
m.inv_vocab = {int(k): v for k, v in data["inv_vocab"].items()}
m.sentences = data.get("sentences", [])
m.unigrams = Counter({int(k): v for k, v in data["unigrams"].items()})
m.ngrams = {}
for k_str, v in data["ngrams"].items():
key = tuple(json.loads(k_str))
m.ngrams[key] = Counter({int(t): c for t, c in v.items()})
m.trained = data["trained"]
print(f"[Model] Загружено: {len(m.vocab):,} слов, {len(m.sentences):,} предложений")
return m