|
|
| import json, torch, numpy as np |
| from sentence_transformers import SentenceTransformer, CrossEncoder |
| import faiss |
| from transformers import AutoTokenizer, AutoModelForCausalLM |
|
|
| class Chronos: |
| def __init__(self, model_dir="."): |
| with open(f"{model_dir}/rag_config.json") as f: |
| config = json.load(f) |
| self.embedder = SentenceTransformer(config["embedder_model"]) |
| self.index = faiss.read_index(f"{model_dir}/jjk_index.faiss") |
| with open(f"{model_dir}/chunks.txt", "r", encoding="utf-8") as f: |
| raw = f.read().split("<|CHUNK_END|>") |
| self.chunks = [c.strip() for c in raw if c.strip()] |
| self.reranker = CrossEncoder(f"{model_dir}/cross_encoder_model") |
| self.tokenizer = AutoTokenizer.from_pretrained(model_dir, trust_remote_code=True) |
| self.model = AutoModelForCausalLM.from_pretrained( |
| model_dir, torch_dtype=torch.float16, device_map='auto', trust_remote_code=True |
| ) |
|
|
| def ask(self, question, max_tokens=350): |
| q_emb = self.embedder.encode([question]).astype('float32') |
| _, indices = self.index.search(q_emb, 30) |
| candidates = [self.chunks[i] for i in indices[0]] |
| pairs = [(question, c) for c in candidates] |
| scores = self.reranker.predict(pairs) |
| best = sorted(zip(scores, candidates), reverse=True)[:4] |
| context = "\n\n".join([c for _, c in best]) |
| messages = [ |
| {"role": "system", "content": "You are Chronos, a historian specializing in the 20th century. Use the provided Wikipedia context to answer accurately. Be detailed but concise and friendly."}, |
| {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"} |
| ] |
| prompt = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) |
| inputs = self.tokenizer(prompt, return_tensors="pt").to(self.model.device) |
| outputs = self.model.generate(**inputs, max_new_tokens=max_tokens, |
| temperature=0.7, do_sample=True, |
| pad_token_id=self.tokenizer.eos_token_id) |
| answer = self.tokenizer.decode(outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=True) |
| return answer.strip() |
|
|