| from openai import OpenAI |
| import os |
|
|
| |
|
|
| class QwqChatBot: |
| def __init__(self, model="qwq-32b"): |
| self.client = OpenAI( |
| api_key="sk-1a5efcc4ce48413480e1001fe84be787", |
| base_url="https://dashscope.aliyuncs.com/compatible-mode/v1" |
| ) |
| self.model = model |
| self.messages = [] |
|
|
| def ask(self, user_input: str) -> dict: |
| """用户提问,返回dict包含思考过程、最终回答、合并输出""" |
| self.messages.append({"role": "user", "content": user_input}) |
|
|
| reasoning_content = "" |
| answer_content = "" |
| is_answering = False |
|
|
| completion = self.client.chat.completions.create( |
| model=self.model, |
| messages=self.messages, |
| stream=True |
| ) |
|
|
| for chunk in completion: |
| if not chunk.choices: |
| continue |
|
|
| delta = chunk.choices[0].delta |
|
|
| if hasattr(delta, "reasoning_content") and delta.reasoning_content: |
| reasoning_content += delta.reasoning_content |
| elif hasattr(delta, "content") and delta.content: |
| is_answering = True |
| answer_content += delta.content |
|
|
| |
| self.messages.append({"role": "assistant", "content": answer_content}) |
|
|
| return { |
| "reasoning": reasoning_content, |
| "answer": answer_content, |
| "full_output": reasoning_content + "</think>" + answer_content |
| } |
|
|
| def get_reasoning_and_answer(prompt: str, model: str = "qwq-32b") -> str: |
| client = OpenAI( |
| api_key="sk-1a5efcc4ce48413480e1001fe84be787", |
| base_url="https://dashscope.aliyuncs.com/compatible-mode/v1" |
| ) |
| prompt = prompt + " " |
| stream = client.chat.completions.create( |
| model=model, |
| messages=[{"role": "user", "content": prompt}], |
| stream=True |
| ) |
|
|
| reasoning = "" |
| answer = "" |
| in_answer = False |
|
|
| for chunk in stream: |
| if not chunk.choices: |
| continue |
|
|
| delta = chunk.choices[0].delta |
|
|
| if hasattr(delta, "reasoning_content") and delta.reasoning_content is not None: |
| reasoning += delta.reasoning_content |
| elif hasattr(delta, "content") and delta.content: |
| in_answer = True |
| answer += delta.content |
|
|
| if in_answer: |
| return reasoning + "</think>" + answer |
| else: |
| return reasoning |
| |
| |
| from datasets import load_dataset |
| jbb_dataset = load_dataset("JailbreakBench/JBB-Behaviors", "behaviors") |
| harmful_split = jbb_dataset["harmful"] |
|
|
| root_dir="/home/tianqiu/learn/benchmark/result/" |
| import os |
| import json |
| import datetime |
| result_dir=datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") |
| |
| result_dir = os.path.join(root_dir, result_dir) |
| os.makedirs(result_dir, exist_ok=True) |
|
|
|
|
| with open("/home/tianqiu/learn/benchmark/llama2.json", "r") as f: |
| attack_sample = json.load(f) |
|
|
| for i in range (10): |
| |
| message = harmful_split[i]["Goal"] |
| |
| |
| print("Message:", message) |
| response,prompt =get_reasoning_and_answer(message) |
| print("Response:", response) |
| print("==="*10) |
| json_data={ |
| "goal": message, |
| "prompt":prompt, |
| "response": response, |
| } |
| file_name = os.path.join(result_dir, f"result_{i}.json") |
| with open(file_name, "w") as f: |
| json.dump(json_data, f, indent=4) |