Final_Assignment_Template / evaluation_langgraph.py
José Enrique
langgraph agent
d4bb43c
import os
import json
from dotenv import load_dotenv
from opentelemetry.trace import format_trace_id, get_tracer
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from openinference.instrumentation.smolagents import SmolagentsInstrumentor
from langfuse import observe
from PIL import Image
from langgraph_agent import build_agents
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage
from langfuse import Langfuse
# Load environment variables
load_dotenv()
print("Environment variables loaded.")
# initialize langfuse
langfuse = Langfuse()
# Initialize OpenTelemetry Tracer
#trace_provider = TracerProvider()
#trace_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter()))
#trace.set_tracer_provider(trace_provider) # Set as global provider
#tracer = trace.get_tracer(__name__) # Get a tracer instance
#SmolagentsInstrumentor().instrument(tracer_provider=trace_provider)
def add_image(metadata)->list:
images = []
task_id = metadata["task_id"]
attachment = metadata.get("attachment", False)
if attachment:
os.file_path = f"attachments/{attachment}"
if os.path.exists(os.file_path):
print("Attachments found for task_id:", task_id)
# with open(os.file_path, "rb") as file:
# question += f"\n\nAttachments: {file.read().decode('utf-8')}"
else:
print(f"No attachments found for task_id: {task_id}")
# if the file is an image, we can add it to the question
if os.path.isfile(os.file_path) and os.path.splitext(os.file_path)[1].lower() in ['.jpg', '.jpeg', '.png']:
# open the image and convert it to RGB
with open(os.file_path, "rb") as file:
# Read the image file and convert it to RGB
image = Image.open(file).convert("RGB")
images.append(image)
return images
#@observe()
def run_agent(agent, question,trace_name,metadata):
print("Running agent with question:", question)
# with tracer.start_as_current_span(trace_name) as span:
# span.set_attribute("langfuse.tag", "dataset-run")
# span.set_attribute("langfuse.input", question)
# if the question has attachments:
# find file under /attachments with the same task_id
#images = add_image(metadata)
print("Running agent with question:")
question = question + " The task_id is: " + metadata["task_id"]
messages = [HumanMessage(content=question )]
try:
print("Invoking agent with question:", question)
messages = agent.invoke(
{"messages": messages, "input_file": None}
)
print("Agent messages")
# Show the messages
for m in messages['messages']:
m.pretty_print()
except Exception as e:
print(f"Error running agent: {e}")
output = f"Error running agent: {e}"
#span.set_attribute("langfuse.output", output)
# current_span = trace.get_current_span()
# span_context = current_span.get_span_context()
# trace_id = span_context.trace_id
# formatted_trace_id = format_trace_id(trace_id)
# langfuse_trace = langfuse.get_trace(id=formatted_trace_id)
# langfuse_trace = langfuse.trace(
# id=formatted_trace_id,
# input=question,
# output=output
# )
return messages
def simple_evaluation(output, expected_output):
trimmed_output = str(output).strip().strip('"').strip("$")
# see if the output is a list:
expected_output_list = [item.strip() for item in expected_output.split(",") if item.strip()]
output_list = [item.strip() for item in trimmed_output.split(",") if item.strip()]
similarity = 0.0
if not expected_output_list and not output_list:
similarity = 0.0
if trimmed_output == expected_output:
similarity = 1.0
elif expected_output.toLower() == trimmed_output.toLower():
similarity = 0.8
else:
similarity = 0.0
return similarity
common_items_count = 0
matched_items = [False]*len(output_list)
for item in expected_output_list:
for i,item2 in enumerate(output_list):
if not matched_items[i]:
similarity = 0.0
if item == item2:
similarity = 1.0
elif item.lower() == item2.lower():
similarity = 0.8
else:
similarity = 0.0
if similarity >= 0.8:
matched_items[i] = True
break
common_items_count = sum(matched_items) / len(expected_output_list)
return common_items_count
def run_evaluation(agent,langfuse_dataset,run_name,model_id,trace_name,update_dataset=True):
dataset = langfuse.get_dataset(langfuse_dataset)
responses = []
# Run our agent against each dataset item (limited to first 10 above)
for item in dataset.items:
print(f"Processing item with task_id: {item.metadata['task_id']}")
with item.run(
run_name = run_name
) as root_span:
if update_dataset:
root_span.update(input=item.input)
task_id = item.metadata["task_id"]
if task_id == "5a0c1adf-205e-4841-a666-7c3ef95def9d":
try:
print("Running agent")
output = run_agent(agent,item.input,trace_name,item.metadata)
responses.append({"task_id": task_id, "submitted_answer": output})
if update_dataset:
root_span.update(output=output)
except Exception as e:
print(f"Error running agent: {e}")
output = f"Error running agent: {e}"
# score the result against the expected output
if update_dataset:
root_span.score_trace(name="exact_match", value = simple_evaluation(output, item.expected_output))
# Link the trace to the dataset item for analysis
# item.link(
# langfuse_trace,
# run_name=run_name,
# run_metadata={ "model": model_id }
# )
# Optionally, store a quick evaluation score for demonstration
# langfuse_trace.score(
# name="<example_eval>",
# value=1,
# comment="This is a comment"
# )
# Flush data to ensure all telemetry is sent
if update_dataset:
langfuse.flush()
# Save the responses to a JSON lines file
print("Saving responses to file...")
responses = [{"task_id": item["task_id"], "submitted_answer": item["submitted_answer"]} for item in responses]
filename = langfuse_dataset+run_name
output_file = f"responses_{filename}.json"
with open(output_file, "w") as f:
json.dump(responses, f, indent=4)
print(f"Responses saved to {output_file}")
def evaluate():
print("Starting agent...")
agent = build_agents()
print("Agent built successfully.")
run_evaluation(agent,"GAIA_Evaluation_Dataset","Single Langraph agent","OpenAI gpt4o","langraph-trace",update_dataset=False)
if __name__ == "__main__":
evaluate()