import os import json from dotenv import load_dotenv from opentelemetry.trace import format_trace_id, get_tracer from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import SimpleSpanProcessor from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from openinference.instrumentation.smolagents import SmolagentsInstrumentor from langfuse import observe from PIL import Image from langgraph_agent import build_agents from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage from langfuse import Langfuse # Load environment variables load_dotenv() print("Environment variables loaded.") # initialize langfuse langfuse = Langfuse() # Initialize OpenTelemetry Tracer #trace_provider = TracerProvider() #trace_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter())) #trace.set_tracer_provider(trace_provider) # Set as global provider #tracer = trace.get_tracer(__name__) # Get a tracer instance #SmolagentsInstrumentor().instrument(tracer_provider=trace_provider) def add_image(metadata)->list: images = [] task_id = metadata["task_id"] attachment = metadata.get("attachment", False) if attachment: os.file_path = f"attachments/{attachment}" if os.path.exists(os.file_path): print("Attachments found for task_id:", task_id) # with open(os.file_path, "rb") as file: # question += f"\n\nAttachments: {file.read().decode('utf-8')}" else: print(f"No attachments found for task_id: {task_id}") # if the file is an image, we can add it to the question if os.path.isfile(os.file_path) and os.path.splitext(os.file_path)[1].lower() in ['.jpg', '.jpeg', '.png']: # open the image and convert it to RGB with open(os.file_path, "rb") as file: # Read the image file and convert it to RGB image = Image.open(file).convert("RGB") images.append(image) return images #@observe() def run_agent(agent, question,trace_name,metadata): print("Running agent with question:", question) # with tracer.start_as_current_span(trace_name) as span: # span.set_attribute("langfuse.tag", "dataset-run") # span.set_attribute("langfuse.input", question) # if the question has attachments: # find file under /attachments with the same task_id #images = add_image(metadata) print("Running agent with question:") question = question + " The task_id is: " + metadata["task_id"] messages = [HumanMessage(content=question )] try: print("Invoking agent with question:", question) messages = agent.invoke( {"messages": messages, "input_file": None} ) print("Agent messages") # Show the messages for m in messages['messages']: m.pretty_print() except Exception as e: print(f"Error running agent: {e}") output = f"Error running agent: {e}" #span.set_attribute("langfuse.output", output) # current_span = trace.get_current_span() # span_context = current_span.get_span_context() # trace_id = span_context.trace_id # formatted_trace_id = format_trace_id(trace_id) # langfuse_trace = langfuse.get_trace(id=formatted_trace_id) # langfuse_trace = langfuse.trace( # id=formatted_trace_id, # input=question, # output=output # ) return messages def simple_evaluation(output, expected_output): trimmed_output = str(output).strip().strip('"').strip("$") # see if the output is a list: expected_output_list = [item.strip() for item in expected_output.split(",") if item.strip()] output_list = [item.strip() for item in trimmed_output.split(",") if item.strip()] similarity = 0.0 if not expected_output_list and not output_list: similarity = 0.0 if trimmed_output == expected_output: similarity = 1.0 elif expected_output.toLower() == trimmed_output.toLower(): similarity = 0.8 else: similarity = 0.0 return similarity common_items_count = 0 matched_items = [False]*len(output_list) for item in expected_output_list: for i,item2 in enumerate(output_list): if not matched_items[i]: similarity = 0.0 if item == item2: similarity = 1.0 elif item.lower() == item2.lower(): similarity = 0.8 else: similarity = 0.0 if similarity >= 0.8: matched_items[i] = True break common_items_count = sum(matched_items) / len(expected_output_list) return common_items_count def run_evaluation(agent,langfuse_dataset,run_name,model_id,trace_name,update_dataset=True): dataset = langfuse.get_dataset(langfuse_dataset) responses = [] # Run our agent against each dataset item (limited to first 10 above) for item in dataset.items: print(f"Processing item with task_id: {item.metadata['task_id']}") with item.run( run_name = run_name ) as root_span: if update_dataset: root_span.update(input=item.input) task_id = item.metadata["task_id"] if task_id == "5a0c1adf-205e-4841-a666-7c3ef95def9d": try: print("Running agent") output = run_agent(agent,item.input,trace_name,item.metadata) responses.append({"task_id": task_id, "submitted_answer": output}) if update_dataset: root_span.update(output=output) except Exception as e: print(f"Error running agent: {e}") output = f"Error running agent: {e}" # score the result against the expected output if update_dataset: root_span.score_trace(name="exact_match", value = simple_evaluation(output, item.expected_output)) # Link the trace to the dataset item for analysis # item.link( # langfuse_trace, # run_name=run_name, # run_metadata={ "model": model_id } # ) # Optionally, store a quick evaluation score for demonstration # langfuse_trace.score( # name="", # value=1, # comment="This is a comment" # ) # Flush data to ensure all telemetry is sent if update_dataset: langfuse.flush() # Save the responses to a JSON lines file print("Saving responses to file...") responses = [{"task_id": item["task_id"], "submitted_answer": item["submitted_answer"]} for item in responses] filename = langfuse_dataset+run_name output_file = f"responses_{filename}.json" with open(output_file, "w") as f: json.dump(responses, f, indent=4) print(f"Responses saved to {output_file}") def evaluate(): print("Starting agent...") agent = build_agents() print("Agent built successfully.") run_evaluation(agent,"GAIA_Evaluation_Dataset","Single Langraph agent","OpenAI gpt4o","langraph-trace",update_dataset=False) if __name__ == "__main__": evaluate()