|
|
| import json, torch, numpy as np |
| from sentence_transformers import SentenceTransformer, CrossEncoder |
| import faiss |
| from transformers import AutoTokenizer, AutoModelForCausalLM |
|
|
| class JujutsuKaiserver: |
| def __init__(self, model_dir="."): |
| with open(f"{model_dir}/rag_config.json") as f: |
| config = json.load(f) |
| self.embedder = SentenceTransformer(config["embedder_model"]) |
| self.index = faiss.read_index(f"{model_dir}/jjk_index.faiss") |
| with open(f"{model_dir}/chunks.txt", "r", encoding="utf-8") as f: |
| raw = f.read().split("<|CHUNK_END|>") |
| self.chunks = [c.strip() for c in raw if c.strip()] |
| self.reranker = CrossEncoder(f"{model_dir}/cross_encoder_model") |
| self.tokenizer = AutoTokenizer.from_pretrained(model_dir, trust_remote_code=True) |
| self.model = AutoModelForCausalLM.from_pretrained( |
| model_dir, |
| torch_dtype=torch.float16, |
| device_map='auto', |
| trust_remote_code=True |
| ) |
|
|
| def ask(self, question, max_tokens=300): |
| q_lower = question.strip().lower() |
| if q_lower in ('hi', 'hello', 'hey', 'yo', 'sup', 'hi there'): |
| return "Hey there! I'm JujutsuKaiserver, your all-knowing JJK assistant. Ask me anything!" |
| q_emb = self.embedder.encode([question]).astype('float32') |
| _, indices = self.index.search(q_emb, 30) |
| candidates = [self.chunks[i] for i in indices[0]] |
| pairs = [(question, c) for c in candidates] |
| scores = self.reranker.predict(pairs) |
| reranked = sorted(zip(scores, candidates), reverse=True)[:4] |
| best = [c for _, c in reranked] |
| context = "\n\n".join(best) |
| messages = [ |
| {"role": "system", "content": "You are JujutsuKaiserver, an expert on Jujutsu Kaisen. Answer using ONLY the provided context. Be friendly and concise."}, |
| {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"} |
| ] |
| prompt = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) |
| inputs = self.tokenizer(prompt, return_tensors="pt").to(self.model.device) |
| outputs = self.model.generate(**inputs, max_new_tokens=max_tokens, temperature=0.7, do_sample=True, pad_token_id=self.tokenizer.eos_token_id) |
| answer = self.tokenizer.decode(outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=True) |
| return answer.strip() |
|
|