File size: 2,275 Bytes
9f8fe55
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

import json, torch, numpy as np
from sentence_transformers import SentenceTransformer, CrossEncoder
import faiss
from transformers import AutoTokenizer, AutoModelForCausalLM

class Chronos:
    def __init__(self, model_dir="."):
        with open(f"{model_dir}/rag_config.json") as f:
            config = json.load(f)
        self.embedder = SentenceTransformer(config["embedder_model"])
        self.index = faiss.read_index(f"{model_dir}/jjk_index.faiss")
        with open(f"{model_dir}/chunks.txt", "r", encoding="utf-8") as f:
            raw = f.read().split("<|CHUNK_END|>")
        self.chunks = [c.strip() for c in raw if c.strip()]
        self.reranker = CrossEncoder(f"{model_dir}/cross_encoder_model")
        self.tokenizer = AutoTokenizer.from_pretrained(model_dir, trust_remote_code=True)
        self.model = AutoModelForCausalLM.from_pretrained(
            model_dir, torch_dtype=torch.float16, device_map='auto', trust_remote_code=True
        )

    def ask(self, question, max_tokens=350):
        q_emb = self.embedder.encode([question]).astype('float32')
        _, indices = self.index.search(q_emb, 30)
        candidates = [self.chunks[i] for i in indices[0]]
        pairs = [(question, c) for c in candidates]
        scores = self.reranker.predict(pairs)
        best = sorted(zip(scores, candidates), reverse=True)[:4]
        context = "\n\n".join([c for _, c in best])
        messages = [
            {"role": "system", "content": "You are Chronos, a historian specializing in the 20th century. Use the provided Wikipedia context to answer accurately. Be detailed but concise and friendly."},
            {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"}
        ]
        prompt = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
        inputs = self.tokenizer(prompt, return_tensors="pt").to(self.model.device)
        outputs = self.model.generate(**inputs, max_new_tokens=max_tokens,
                                     temperature=0.7, do_sample=True,
                                     pad_token_id=self.tokenizer.eos_token_id)
        answer = self.tokenizer.decode(outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=True)
        return answer.strip()