File size: 7,983 Bytes
0c51b93 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
import json
import os
import re
from typing import Any, Dict, List, Tuple
from openai import OpenAI
from sotopia_rl.prompter.generic_templates import (
DIMENSION_DESCRIPTION_DICT,
DIRECT_ATTRIBUTION_TEMPLATE,
SCALE_GUIDELINE_DICT,
)
def openai_call(prompt: str, model: str = "gpt-3.5-turbo") -> str | None:
if model in ["gpt-3.5-turbo", "gpt-4", "gpt-4o", "o4-mini"]:
client = OpenAI()
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
response_format={ "type": "json_object" }
)
return response.choices[0].message.content
elif model.startswith("together_ai"):
client = OpenAI(
api_key=os.environ.get("TOGETHER_API_KEY"),
base_url="https://api.together.xyz/v1",
)
together_model = "/".join(model.split("/")[1:])
response = client.chat.completions.create(
model=together_model,
messages=[
{"role": "user", "content": prompt},
],
)
return response.choices[0].message.content
elif model.startswith("claude"):
client = OpenAI(
api_key=os.environ.get("CLAUDE_API_KEY"),
base_url="https://api.anthropic.com/v1",
)
claude_model = "/".join(model.split("/")[1:])
response = client.chat.completions.create(
model=claude_model,
messages=[
{"role": "user", "content": prompt},
],
)
return response.choices[0].message.content
elif model.startswith("gemini"):
client = OpenAI(
api_key=os.environ.get("GEMINI_API_KEY"),
base_url="https://generativelanguage.googleapis.com/v1beta/openai/"
)
claude_model = "/".join(model.split("/")[1:])
response = client.chat.completions.create(
model=claude_model,
messages=[
{"role": "user", "content": prompt},
],
)
return response.choices[0].message.content
else:
raise ValueError(f"Model {model} not supported.")
def extract_json(text: str) -> str | None:
# Use regex to find the JSON string within the text
match = re.search(r"\{\n.*?\n\}", text, re.DOTALL)
if match:
json_str = match.group(0)
return json_str
else:
return None
def get_attribution_formatting_instructions(agent: str) -> str:
return f"""
Please format your response as JSON with the following structure:
{{
"Utterance 0 by {agent}": 0,
"Utterance 1 by {agent}": 2,
...
}}
The utterance numbers should correspond to their order in the conversation. Each score should reflect how much the utterance contributed to achieving the agent's goals. Please annotate every utterance made by an agent in the conversation, denoted "Utterance X by agent_name". For example, "Utterance 6 by Finnegan O'Malley". Please give a score even if the utterance is the end of the conversation.
"""
def get_single_attribution_prompt(
conversation: List[Tuple[str, str]],
agent: str,
agent_goal: str,
agent_background: str,
dimension: str,
scale: str,
) -> Tuple[str, Dict[str, List[Any]]]:
scoring_guidelines = SCALE_GUIDELINE_DICT[scale]
dimension_description = DIMENSION_DESCRIPTION_DICT[dimension]
conversation_prompt = "Conversation:\n"
for i, (speaker, utterance) in enumerate(conversation):
conversation_prompt += f"Utterance {i//2} by {speaker}: {utterance}\n"
formatting_instruction = get_attribution_formatting_instructions(agent)
prompt = DIRECT_ATTRIBUTION_TEMPLATE.format(
scoring_guidelines=scoring_guidelines,
agent=agent,
goal=agent_goal,
agent_background=agent_background,
conversation=conversation_prompt,
dimension=dimension,
dimension_description=dimension_description,
formatting_instructions=formatting_instruction,
)
return prompt
def assign_attributions_for_conversation(
prompt: str, conversation: list, agent: str, llm_name: str = "gpt-3.5-turbo"
) -> Dict[str, int] | Any:
for i in range(5):
uttr_count = 0
for j, (speaker, _) in enumerate(conversation):
if speaker == agent:
uttr_count += 1
response = openai_call(prompt + f"\nYou are supposed to be returning {uttr_count} attributions.", llm_name)
if response is None:
print("Failed to get response from OpenAI; returning empty dictionary")
return {}
else:
try:
result = json.loads(response)
except json.JSONDecodeError:
formatted_response = extract_json(response)
if formatted_response is None:
print(
"Failed to extract JSON string from response; returning empty dictionary"
)
print(response)
return {}
result = json.loads(formatted_response)
try:
for key in result:
result[key] = int(result[key])
except ValueError:
print("Failed to convert all values to integers; retrying")
continue
if uttr_count != len(result) and i < 4:
print("Response length does not match the number of agent utterances; retrying")
elif uttr_count == len(result):
break
else:
print("Response length does not match the number of agent utterances after 5 attempts; returning original dictionary")
return result
def calc_reward(utter_attrib: float, scale: str, dim_score: float) -> float:
denominator = {"default": 3, "5_scale": 5, "10_scale": 10}[scale]
if utter_attrib == -1:
reward = -1.0
else:
reward = utter_attrib / denominator * dim_score
return reward
def calc_attributed_reward(attributed_data: List[Dict[str, float | int]], scale: str, dim_score: float | int) -> List[Dict[str, Any]]:
utterance_reward_map = {}
for k, v in attributed_data.items():
utterance_reward_map[k] = {"reward": calc_reward(v, scale, dim_score), "attribution": v}
return utterance_reward_map
def fill_in_attribution_scores(
conversation: List[Tuple[str, str]],
raw_attribution_scores: Dict[str, Any],
agent: str,
) -> Dict[str, Any]:
attribution_dict = {}
for i, (speaker, utterance) in enumerate(conversation):
if speaker != agent:
continue
key = f"Utterance {i//2} by {speaker}"
attribution_dict[key] = raw_attribution_scores.get(key, 0)
return attribution_dict
# unified function
def get_attribution_single_conv(conversation, agent, goals, episode, rewards, llm_name, attribution_instruction_name):
scale, dimension = attribution_instruction_name.split("-")
assert scale in SCALE_GUIDELINE_DICT, f"Scale {scale} not in scale dict"
assert dimension in DIMENSION_DESCRIPTION_DICT, f"Dimension {dimension} not in dimension dict"
agent_goal = goals[agent]
agent_background = episode["agents_background"][agent]
prompt = get_single_attribution_prompt(
conversation, agent, agent_goal, agent_background, dimension, scale
)
attribution_scores = assign_attributions_for_conversation(
prompt, conversation, agent, llm_name=llm_name
)
if dimension == "conversation_behavior":
dim_score = 10
elif dimension == "goal_barebone":
dim_score = rewards[agent]["goal"]
else:
dim_score = rewards[agent][dimension]
attribution_rewards = calc_attributed_reward(attribution_scores, scale, dim_score)
for key in attribution_rewards:
attribution_rewards[key]["dimension"] = dimension
attribution_rewards[key]["scale"] = scale
attribution_rewards[key]["dim_score"] = dim_score
return attribution_rewards
|