blog2code-api / codes /utils.py
srishtichugh's picture
initial commit
2fd8593
import json
import re
import os
from datetime import datetime
def extract_planning(trajectories_json_file_path):
with open(trajectories_json_file_path) as f:
traj = json.load(f)
context_lst = []
for turn in traj:
if turn['role'] == 'assistant':
# context_lst.append(turn['content'])
content = turn['content']
if "</think>" in content:
content = content.split("</think>")[-1].strip()
context_lst.append(content)
context_lst = context_lst[:3]
return context_lst
def content_to_json(data):
clean_data = re.sub(r'\[CONTENT\]|\[/CONTENT\]', '', data).strip()
clean_data = re.sub(r'(".*?"),\s*#.*', r'\1,', clean_data)
clean_data = re.sub(r',\s*\]', ']', clean_data)
clean_data = re.sub(r'\n\s*', '', clean_data)
# JSON parsing
try:
json_data = json.loads(clean_data)
return json_data
except json.JSONDecodeError as e:
# print(e)
return content_to_json2(data)
def content_to_json2(data):
# remove [CONTENT][/CONTENT]
clean_data = re.sub(r'\[CONTENT\]|\[/CONTENT\]', '', data).strip()
# "~~~~", #comment -> "~~~~",
clean_data = re.sub(r'(".*?"),\s*#.*', r'\1,', clean_data)
# "~~~~" #comment → "~~~~"
clean_data = re.sub(r'(".*?")\s*#.*', r'\1', clean_data)
# ("~~~~",] -> "~~~~"])
clean_data = re.sub(r',\s*\]', ']', clean_data)
clean_data = re.sub(r'\n\s*', '', clean_data)
# JSON parsing
try:
json_data = json.loads(clean_data)
return json_data
except json.JSONDecodeError as e:
# print("Json parsing error", e)
return content_to_json3(data)
def content_to_json3(data):
# remove [CONTENT] [/CONTENT]
clean_data = re.sub(r'\[CONTENT\]|\[/CONTENT\]', '', data).strip()
# "~~~~", #comment -> "~~~~",
clean_data = re.sub(r'(".*?"),\s*#.*', r'\1,', clean_data)
# "~~~~" #comment → "~~~~"
clean_data = re.sub(r'(".*?")\s*#.*', r'\1', clean_data)
# remove ("~~~~",] -> "~~~~"])
clean_data = re.sub(r',\s*\]', ']', clean_data)
clean_data = re.sub(r'\n\s*', '', clean_data)
clean_data = re.sub(r'"""', '"', clean_data) # Replace triple double quotes
clean_data = re.sub(r"'''", "'", clean_data) # Replace triple single quotes
clean_data = re.sub(r"\\", "'", clean_data) # Replace \
# JSON parsing
try:
json_data = json.loads(f"""{clean_data}""")
return json_data
except json.JSONDecodeError as e:
# print(e)
# print(f"[DEBUG] utils.py > content_to_json3 ")
# return None
return content_to_json4(data)
def content_to_json4(data):
# 1. Extract Logic Analysis, Task list
pattern = r'"Logic Analysis":\s*(\[[\s\S]*?\])\s*,\s*"Task list":\s*(\[[\s\S]*?\])'
match = re.search(pattern, data)
if match:
logic_analysis = json.loads(match.group(1))
task_list = json.loads(match.group(2))
result = {
"Logic Analysis": logic_analysis,
"Task list": task_list
}
else:
result = {}
# print(json.dumps(result, indent=2))
return result
def extract_code_from_content(content):
pattern = r'^```(?:\w+)?\s*\n(.*?)(?=^```)```'
code = re.findall(pattern, content, re.DOTALL | re.MULTILINE)
if len(code) == 0:
return ""
else:
return code[0]
def extract_code_from_content2(content):
pattern = r'```python\s*(.*?)```'
result = re.search(pattern, content, re.DOTALL)
if result:
extracted_code = result.group(1).strip()
else:
extracted_code = ""
print("[WARNING] No Python code found.")
return extracted_code
def format_json_data(data):
formatted_text = ""
for key, value in data.items():
formatted_text += "-" * 40 + "\n"
formatted_text += "[" + key + "]\n"
if isinstance(value, list):
for item in value:
formatted_text += f"- {item}\n"
else:
formatted_text += str(value) + "\n"
formatted_text += "\n"
return formatted_text
def cal_cost(response_json, model_name):
model_cost = {
# OpenAI Models
"gpt-4o-mini": {"input": 0.150, "cached_input": 0.075, "output": 0.600},
"gpt-4o": {"input": 2.50, "cached_input": 1.25, "output": 10.00},
# gpt-4o-realtime-preview
"gpt-4o-realtime-preview": {"input": 5.00, "cached_input": 2.50, "output": 20.00},
"gpt-4o-realtime-preview-2024-12-17": {"input": 5.00, "cached_input": 2.50, "output": 20.00},
"gpt-4o-realtime-preview-2024-10-01": {"input": 5.00, "cached_input": 2.50, "output": 20.00},
# gpt-4o-mini
"gpt-4o-mini": {"input": 0.15, "cached_input": 0.075, "output": 0.60},
"gpt-4o-mini-2024-07-18": {"input": 0.15, "cached_input": 0.075, "output": 0.60},
# gpt-4o-mini-audio-preview
"gpt-4o-mini-audio-preview": {"input": 0.15, "cached_input": None, "output": 0.60},
"gpt-4o-mini-audio-preview-2024-12-17": {"input": 0.15, "cached_input": None, "output": 0.60},
# gpt-4o-mini-realtime-preview
"gpt-4o-mini-realtime-preview": {"input": 0.60, "cached_input": 0.30, "output": 2.40},
"gpt-4o-mini-realtime-preview-2024-12-17": {"input": 0.60, "cached_input": 0.30, "output": 2.40},
# o1
"o1": {"input": 15.00, "cached_input": 7.50, "output": 60.00},
"o1-2024-12-17": {"input": 15.00, "cached_input": 7.50, "output": 60.00},
"o1-preview-2024-09-12": {"input": 15.00, "cached_input": 7.50, "output": 60.00},
# o1-pro
"o1-pro": {"input": 150.00, "cached_input": None, "output": 600.00},
"o1-pro-2025-03-19": {"input": 150.00, "cached_input": None, "output": 600.00},
# o3
"o3": {"input": 10.00, "cached_input": 2.50, "output": 40.00},
"o3-2025-04-16": {"input": 10.00, "cached_input": 2.50, "output": 40.00},
# o4-mini
"o4-mini": {"input": 1.10, "cached_input": 0.275, "output": 4.40},
"o4-mini-2025-04-16": {"input": 1.10, "cached_input": 0.275, "output": 4.40},
# o3-mini
"o3-mini": {"input": 1.10, "cached_input": 0.55, "output": 4.40},
"o3-mini-2025-01-31": {"input": 1.10, "cached_input": 0.55, "output": 4.40},
# o1-mini
"o1-mini": {"input": 1.10, "cached_input": 0.55, "output": 4.40},
"o1-mini-2024-09-12": {"input": 1.10, "cached_input": 0.55, "output": 4.40},
# gpt-4o-mini-search-preview
"gpt-4o-mini-search-preview": {"input": 0.15, "cached_input": None, "output": 0.60},
"gpt-4o-mini-search-preview-2025-03-11": {"input": 0.15, "cached_input": None, "output": 0.60},
# gpt-4o-search-preview
"gpt-4o-search-preview": {"input": 2.50, "cached_input": None, "output": 10.00},
"gpt-4o-search-preview-2025-03-11": {"input": 2.50, "cached_input": None, "output": 10.00},
# computer-use-preview
"computer-use-preview": {"input": 3.00, "cached_input": None, "output": 12.00},
"computer-use-preview-2025-03-11": {"input": 3.00, "cached_input": None, "output": 12.00},
# gpt-image-1
"gpt-image-1": {"input": 5.00, "cached_input": None, "output": None},
# Google Gemini Models
"gemini-1.5-flash": {"input": 0.075, "cached_input": 0.01875, "output": 0.30},
"gemini-1.5-pro": {"input": 1.25, "cached_input": 0.3125, "output": 5.00},
"gemini-2.0-flash-exp": {"input": 0.0, "cached_input": 0.0, "output": 0.0},
"gemini-3-flash-preview": {"input": 0.0, "cached_input": 0.0, "output": 0.0},
"models/gemini-1.5-flash": {"input": 0.075, "cached_input": 0.01875, "output": 0.30},
"models/gemini-1.5-pro": {"input": 1.25, "cached_input": 0.3125, "output": 5.00},
"models/gemini-2.0-flash-exp": {"input": 0.0, "cached_input": 0.0, "output": 0.0},
"models/gemini-3-flash-preview": {"input": 0.0, "cached_input": 0.0, "output": 0.0},
}
# Extract token counts
prompt_tokens = response_json["usage"]["prompt_tokens"]
completion_tokens = response_json["usage"]["completion_tokens"]
# Handle cached tokens (may not exist in all providers)
cached_tokens = 0
if "prompt_tokens_details" in response_json["usage"]:
cached_tokens = response_json["usage"]["prompt_tokens_details"].get("cached_tokens", 0)
elif "cached_tokens" in response_json["usage"]:
cached_tokens = response_json["usage"]["cached_tokens"]
# input token = (prompt_tokens - cached_tokens)
actual_input_tokens = prompt_tokens - cached_tokens
output_tokens = completion_tokens
# Get cost info with fallback for unknown models
if model_name not in model_cost:
print(f"⚠️ Warning: Unknown model '{model_name}', assuming free tier")
cost_info = {"input": 0.0, "cached_input": 0.0, "output": 0.0}
else:
cost_info = model_cost[model_name]
input_cost = (actual_input_tokens / 1_000_000) * cost_info['input']
cached_input_cost = (cached_tokens / 1_000_000) * cost_info['cached_input']
output_cost = (output_tokens / 1_000_000) * cost_info['output']
total_cost = input_cost + cached_input_cost + output_cost
return {
'model_name': model_name,
'actual_input_tokens': actual_input_tokens,
'input_cost': input_cost,
'cached_tokens': cached_tokens,
'cached_input_cost': cached_input_cost,
'output_tokens': output_tokens,
'output_cost': output_cost,
'total_cost': total_cost,
}
def load_accumulated_cost(accumulated_cost_file):
if os.path.exists(accumulated_cost_file):
with open(accumulated_cost_file, "r", encoding="utf-8") as f:
data = json.load(f)
return data.get("total_cost", 0.0)
else:
return 0.0
def save_accumulated_cost(accumulated_cost_file, cost):
with open(accumulated_cost_file, "w", encoding="utf-8") as f:
json.dump({"total_cost": cost}, f)
def print_response(completion_json, is_llm=False):
print("============================================")
if is_llm:
print(completion_json['text'])
else:
print(completion_json['choices'][0]['message']['content'])
print("============================================\n")
def print_log_cost(completion_json, gpt_version, current_stage, output_dir, total_accumulated_cost):
usage_info = cal_cost(completion_json, gpt_version)
current_cost = usage_info['total_cost']
total_accumulated_cost += current_cost
output_lines = []
output_lines.append("🌟 Usage Summary 🌟")
output_lines.append(f"{current_stage}")
output_lines.append(f"🛠️ Model: {usage_info['model_name']}")
output_lines.append(f"📥 Input tokens: {usage_info['actual_input_tokens']} (Cost: ${usage_info['input_cost']:.8f})")
output_lines.append(f"📦 Cached input tokens: {usage_info['cached_tokens']} (Cost: ${usage_info['cached_input_cost']:.8f})")
output_lines.append(f"📤 Output tokens: {usage_info['output_tokens']} (Cost: ${usage_info['output_cost']:.8f})")
output_lines.append(f"💵 Current total cost: ${current_cost:.8f}")
output_lines.append(f"🪙 Accumulated total cost so far: ${total_accumulated_cost:.8f}")
output_lines.append("============================================\n")
output_text = "\n".join(output_lines)
print(output_text)
with open(f"{output_dir}/cost_info.log", "a", encoding="utf-8") as f:
f.write(output_text + "\n")
return total_accumulated_cost
def num_tokens_from_messages(messages, model="gpt-4o-2024-08-06"):
import tiktoken
"""Return the number of tokens used by a list of messages."""
try:
encoding = tiktoken.encoding_for_model(model)
except KeyError:
print("Warning: model not found. Using o200k_base encoding.")
encoding = tiktoken.get_encoding("o200k_base")
if model in {
"gpt-3.5-turbo-0125",
"gpt-4-0314",
"gpt-4-32k-0314",
"gpt-4-0613",
"gpt-4-32k-0613",
"gpt-4o-mini-2024-07-18",
"gpt-4o-2024-08-06"
}:
tokens_per_message = 3
tokens_per_name = 1
elif "gpt-3.5-turbo" in model:
print("Warning: gpt-3.5-turbo may update over time. Returning num tokens assuming gpt-3.5-turbo-0125.")
return num_tokens_from_messages(messages, model="gpt-3.5-turbo-0125")
elif "gpt-4o-mini" in model:
print("Warning: gpt-4o-mini may update over time. Returning num tokens assuming gpt-4o-mini-2024-07-18.")
return num_tokens_from_messages(messages, model="gpt-4o-mini-2024-07-18")
elif "gpt-4o" in model:
print("Warning: gpt-4o and gpt-4o-mini may update over time. Returning num tokens assuming gpt-4o-2024-08-06.")
return num_tokens_from_messages(messages, model="gpt-4o-2024-08-06")
elif "gpt-4" in model:
print("Warning: gpt-4 may update over time. Returning num tokens assuming gpt-4-0613.")
return num_tokens_from_messages(messages, model="gpt-4-0613")
else:
raise NotImplementedError(
f"""num_tokens_from_messages() is not implemented for model {model}."""
)
num_tokens = 0
for message in messages:
num_tokens += tokens_per_message
for key, value in message.items():
# num_tokens += len(encoding.encode(value)
num_tokens += len(encoding.encode(value, allowed_special={"<|endoftext|>"},disallowed_special=()))
if key == "name":
num_tokens += tokens_per_name
num_tokens += 3 # every reply is primed with <|start|>assistant<|message|>
return num_tokens
def read_all_files(directory, allowed_ext, is_print=True):
"""Recursively read all .py files in the specified directory and return their contents."""
all_files_content = {}
for root, _, files in os.walk(directory): # Recursively traverse directories
for filename in files:
relative_path = os.path.relpath(os.path.join(root, filename), directory) # Preserve directory structure
# print(f"fn: {filename}\tdirectory: {directory}")
_file_name, ext = os.path.splitext(filename)
is_skip = False
if len(directory) < len(root):
root2 = root[len(directory)+1:]
for dirname in root2.split("/"):
if dirname.startswith("."):
is_skip = True
break
if filename.startswith(".") or "requirements.txt" in filename or ext == "" or is_skip:
if is_print and ext == "":
print(f"[SKIP] {os.path.join(root, filename)}")
continue
if ext not in allowed_ext:
if _file_name.lower() != "readme":
if is_print:
print(f"[SKIP] {os.path.join(root, filename)}")
continue
try:
filepath = os.path.join(root, filename)
file_size = os.path.getsize(filepath) # bytes
if file_size > 204800: # > 200KB
print(f"[BIG] {filepath} {file_size}")
with open(filepath, "r") as file: # encoding="utf-8"
all_files_content[relative_path] = file.read()
except Exception as e:
print(e)
print(f"[SKIP] {os.path.join(root, filename)}")
return all_files_content
def read_python_files(directory):
"""Recursively read all .py files in the specified directory and return their contents."""
python_files_content = {}
for root, _, files in os.walk(directory): # Recursively traverse directories
for filename in files:
if filename.endswith(".py"): # Check if file has .py extension
relative_path = os.path.relpath(os.path.join(root, filename), directory) # Preserve directory structure
with open(os.path.join(root, filename), "r", encoding="utf-8") as file:
python_files_content[relative_path] = file.read()
return python_files_content
def extract_json_from_string(text):
# Extract content inside ```yaml\n...\n```
match = re.search(r"```json\n(.*?)\n```", text, re.DOTALL)
if match:
yaml_content = match.group(1)
return yaml_content
else:
print("No JSON content found.")
return ""
def get_now_str():
now = datetime.now()
now = str(now)
now = now.split(".")[0]
now = now.replace("-","").replace(" ","_").replace(":","")
return now # now - "20250427_205124"