Spaces:
Sleeping
Sleeping
File size: 7,412 Bytes
9ccff9e d4bb43c 9ccff9e d4bb43c 9ccff9e d4bb43c 9ccff9e d4bb43c 9ccff9e d4bb43c 9ccff9e d4bb43c 9ccff9e d4bb43c 9ccff9e d4bb43c 9ccff9e d4bb43c 9ccff9e d4bb43c 9ccff9e d4bb43c 9ccff9e d4bb43c 9ccff9e d4bb43c 9ccff9e d4bb43c 9ccff9e d4bb43c 9ccff9e d4bb43c 9ccff9e d4bb43c 9ccff9e d4bb43c 9ccff9e d4bb43c 9ccff9e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
import os
import json
from dotenv import load_dotenv
from opentelemetry.trace import format_trace_id, get_tracer
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from openinference.instrumentation.smolagents import SmolagentsInstrumentor
from langfuse import observe
from PIL import Image
from langgraph_agent import build_agents
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage
from langfuse import Langfuse
# Load environment variables
load_dotenv()
print("Environment variables loaded.")
# initialize langfuse
langfuse = Langfuse()
# Initialize OpenTelemetry Tracer
#trace_provider = TracerProvider()
#trace_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter()))
#trace.set_tracer_provider(trace_provider) # Set as global provider
#tracer = trace.get_tracer(__name__) # Get a tracer instance
#SmolagentsInstrumentor().instrument(tracer_provider=trace_provider)
def add_image(metadata)->list:
images = []
task_id = metadata["task_id"]
attachment = metadata.get("attachment", False)
if attachment:
os.file_path = f"attachments/{attachment}"
if os.path.exists(os.file_path):
print("Attachments found for task_id:", task_id)
# with open(os.file_path, "rb") as file:
# question += f"\n\nAttachments: {file.read().decode('utf-8')}"
else:
print(f"No attachments found for task_id: {task_id}")
# if the file is an image, we can add it to the question
if os.path.isfile(os.file_path) and os.path.splitext(os.file_path)[1].lower() in ['.jpg', '.jpeg', '.png']:
# open the image and convert it to RGB
with open(os.file_path, "rb") as file:
# Read the image file and convert it to RGB
image = Image.open(file).convert("RGB")
images.append(image)
return images
#@observe()
def run_agent(agent, question,trace_name,metadata):
print("Running agent with question:", question)
# with tracer.start_as_current_span(trace_name) as span:
# span.set_attribute("langfuse.tag", "dataset-run")
# span.set_attribute("langfuse.input", question)
# if the question has attachments:
# find file under /attachments with the same task_id
#images = add_image(metadata)
print("Running agent with question:")
question = question + " The task_id is: " + metadata["task_id"]
messages = [HumanMessage(content=question )]
try:
print("Invoking agent with question:", question)
messages = agent.invoke(
{"messages": messages, "input_file": None}
)
print("Agent messages")
# Show the messages
for m in messages['messages']:
m.pretty_print()
except Exception as e:
print(f"Error running agent: {e}")
output = f"Error running agent: {e}"
#span.set_attribute("langfuse.output", output)
# current_span = trace.get_current_span()
# span_context = current_span.get_span_context()
# trace_id = span_context.trace_id
# formatted_trace_id = format_trace_id(trace_id)
# langfuse_trace = langfuse.get_trace(id=formatted_trace_id)
# langfuse_trace = langfuse.trace(
# id=formatted_trace_id,
# input=question,
# output=output
# )
return messages
def simple_evaluation(output, expected_output):
trimmed_output = str(output).strip().strip('"').strip("$")
# see if the output is a list:
expected_output_list = [item.strip() for item in expected_output.split(",") if item.strip()]
output_list = [item.strip() for item in trimmed_output.split(",") if item.strip()]
similarity = 0.0
if not expected_output_list and not output_list:
similarity = 0.0
if trimmed_output == expected_output:
similarity = 1.0
elif expected_output.toLower() == trimmed_output.toLower():
similarity = 0.8
else:
similarity = 0.0
return similarity
common_items_count = 0
matched_items = [False]*len(output_list)
for item in expected_output_list:
for i,item2 in enumerate(output_list):
if not matched_items[i]:
similarity = 0.0
if item == item2:
similarity = 1.0
elif item.lower() == item2.lower():
similarity = 0.8
else:
similarity = 0.0
if similarity >= 0.8:
matched_items[i] = True
break
common_items_count = sum(matched_items) / len(expected_output_list)
return common_items_count
def run_evaluation(agent,langfuse_dataset,run_name,model_id,trace_name,update_dataset=True):
dataset = langfuse.get_dataset(langfuse_dataset)
responses = []
# Run our agent against each dataset item (limited to first 10 above)
for item in dataset.items:
print(f"Processing item with task_id: {item.metadata['task_id']}")
with item.run(
run_name = run_name
) as root_span:
if update_dataset:
root_span.update(input=item.input)
task_id = item.metadata["task_id"]
if task_id == "5a0c1adf-205e-4841-a666-7c3ef95def9d":
try:
print("Running agent")
output = run_agent(agent,item.input,trace_name,item.metadata)
responses.append({"task_id": task_id, "submitted_answer": output})
if update_dataset:
root_span.update(output=output)
except Exception as e:
print(f"Error running agent: {e}")
output = f"Error running agent: {e}"
# score the result against the expected output
if update_dataset:
root_span.score_trace(name="exact_match", value = simple_evaluation(output, item.expected_output))
# Link the trace to the dataset item for analysis
# item.link(
# langfuse_trace,
# run_name=run_name,
# run_metadata={ "model": model_id }
# )
# Optionally, store a quick evaluation score for demonstration
# langfuse_trace.score(
# name="<example_eval>",
# value=1,
# comment="This is a comment"
# )
# Flush data to ensure all telemetry is sent
if update_dataset:
langfuse.flush()
# Save the responses to a JSON lines file
print("Saving responses to file...")
responses = [{"task_id": item["task_id"], "submitted_answer": item["submitted_answer"]} for item in responses]
filename = langfuse_dataset+run_name
output_file = f"responses_{filename}.json"
with open(output_file, "w") as f:
json.dump(responses, f, indent=4)
print(f"Responses saved to {output_file}")
def evaluate():
print("Starting agent...")
agent = build_agents()
print("Agent built successfully.")
run_evaluation(agent,"GAIA_Evaluation_Dataset","Single Langraph agent","OpenAI gpt4o","langraph-trace",update_dataset=False)
if __name__ == "__main__":
evaluate() |