| import torch |
| import json |
| import re |
| import os |
| import transformers |
| from transformers import LogitsProcessor, LogitsProcessorList, AutoModelForCausalLM, AutoTokenizer |
|
|
| CONTROLLED_REASONING_CORE = "You are a helpful assistant with a Controlled Reasoning Core. Please reason step by step." |
|
|
| class InterventionLogitsProcessor(LogitsProcessor): |
| def __init__(self, boost_token_id, boost_value=2.0): |
| self.boost_token_id = boost_token_id |
| self.boost_value = boost_value |
|
|
| def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor) -> torch.FloatTensor: |
| scores[:, self.boost_token_id] += self.boost_value |
| return scores |
|
|
| class prettybird_bce_basic_brain_mini: |
| def __init__(self, model_path="qwen_merged", device="cuda" if torch.cuda.is_available() else "cpu"): |
| self.device = device |
| print(f"Transformers version: {transformers.__version__}") |
|
|
| local_path = model_path |
| if not os.path.exists(local_path): |
| if os.path.exists(os.path.join("llama.cpp", model_path)): |
| local_path = os.path.join("llama.cpp", model_path) |
| elif os.path.exists("/content/qwen_merged"): |
| local_path = "/content/qwen_merged" |
| |
| final_path = local_path if os.path.exists(local_path) else "Qwen/Qwen2.5-Math-1.5B-Instruct" |
| print(f"Loading model from {final_path}...") |
|
|
| try: |
| |
| self.tokenizer = AutoTokenizer.from_pretrained(final_path, trust_remote_code=False) |
| self.model = AutoModelForCausalLM.from_pretrained( |
| final_path, |
| device_map=device, |
| trust_remote_code=False, |
| torch_dtype=torch.float16 |
| ) |
| print("Loaded natively.") |
| except Exception as e: |
| print(f"Native load failed: {e}. Trying remote code...") |
| try: |
| self.tokenizer = AutoTokenizer.from_pretrained(final_path, trust_remote_code=True) |
| self.model = AutoModelForCausalLM.from_pretrained( |
| final_path, |
| device_map=device, |
| trust_remote_code=True, |
| torch_dtype=torch.float16 |
| ) |
| print("Loaded with remote code.") |
| except Exception as e2: |
| raise RuntimeError(f"Failed to load model: {e2}") |
| |
| if self.tokenizer.pad_token is None: |
| self.tokenizer.pad_token = self.tokenizer.eos_token |
|
|
| def math_reward(self, response): |
| score = 0.0 |
| if re.search(r"\\boxed\{.*?\}", response): |
| score += 1.0 |
| if len(response) > 50: |
| score += 0.5 |
| return score |
|
|
| def parameter_editing(self, layer_idx=0, noise_scale=1e-5): |
| print(f"Editing parameters in layer {layer_idx}...") |
| try: |
| with torch.no_grad(): |
| if hasattr(self.model, 'model'): |
| layers = self.model.model.layers |
| else: |
| layers = self.model.layers |
| weights = layers[layer_idx].self_attn.q_proj.weight |
| noise = torch.randn_like(weights) * noise_scale |
| weights.add_(noise) |
| print("Parameter editing complete.") |
| except Exception as e: |
| print(f"Error editing params: {e}") |
|
|
| def run_tool(self, tool_name, query): |
| if tool_name == "calculator": |
| try: |
| clean_query = re.sub(r"[^0-9+\-*/(). ]", "", query) |
| if not clean_query.strip(): return "Invalid" |
| return str(eval(clean_query)) |
| except: |
| return "Error" |
| return "Unknown" |
|
|
| def generate_response(self, query, use_tool=False, use_intervention=False): |
| input_text = query |
| if use_tool or "calculate" in query.lower(): |
| match = re.search(r"([\d\.\s\+\-\*\/\(\)]+)", query) |
| if match and len(match.group(1).strip()) > 3: |
| res = self.run_tool("calculator", match.group(1)) |
| input_text += f"\nTool Result: {res}" |
|
|
| messages = [{"role": "system", "content": CONTROLLED_REASONING_CORE}, {"role": "user", "content": input_text}] |
| inputs = self.tokenizer.apply_chat_template(messages, tokenize=True, add_generation_prompt=True, return_tensors="pt").to(self.device) |
| |
| logits_processor = LogitsProcessorList() |
| if use_intervention: |
| print("Applying intervention...") |
| ids = self.tokenizer.encode("Therefore", add_special_tokens=False) |
| if ids: |
| logits_processor.append(InterventionLogitsProcessor(ids[0], 5.0)) |
|
|
| outputs = self.model.generate(inputs, max_new_tokens=100, logits_processor=logits_processor, do_sample=True) |
| response = self.tokenizer.decode(outputs[0][len(inputs[0]):], skip_special_tokens=True) |
| |
| print(f"Response: {response}") |
| print(f"Reward: {self.math_reward(response)}") |
| return response |
|
|