Spaces:
Running
Running
| from langsmith_agent_graph.component_types import KnowledgeGraph | |
| from agentgraph.shared.component_types import KnowledgeGraph as KG , Entity, Relation | |
| from pydantic_ai import Agent | |
| from pydantic_ai.models.openai import OpenAIModel | |
| from typing import List | |
| import json | |
| from core.azure import azure_provider | |
| from core.logger import setup_logger | |
| from pydantic import BaseModel, Field | |
| logger = setup_logger(__name__) | |
| def get_formatted_kg(kg: KnowledgeGraph, content_mapping: dict) -> KG: | |
| return KG( | |
| entities=[Entity( | |
| id = entity.id, | |
| entity_type=entity.type, | |
| name=entity.name, | |
| description=entity.description, | |
| raw_prompt='\n'.join([content_mapping[ref.run_id][0][ref.line_start:ref.line_end+1] for ref in entity.raw_prompt_ref if ref.run_id in content_mapping]), | |
| ) for entity in kg.entities], | |
| relations=[Relation( | |
| id = relation.id, | |
| description=relation.description, | |
| source=relation.source, | |
| target=relation.target, | |
| type=relation.type, | |
| interaction_prompt='\n'.join([content_mapping[ref.run_id][0][ref.line_start:ref.line_end+1] for ref in relation.interaction_prompt_ref if ref.run_id in content_mapping]), | |
| ) for relation in kg.relations], | |
| system_name=kg.system_name, | |
| system_summary=kg.system_summary, | |
| ) | |
| def get_runs_summary(runs: List[dict]) -> str: | |
| return json.dumps([{ | |
| 'id': run['id'], | |
| 'name': run['name'], | |
| 'run_type': run['run_type'], | |
| 'start_time': run['start_time'], | |
| 'end_time': run['end_time'], | |
| } for run in runs], indent=2) | |
| def get_runs_relations(runs: List[dict]) -> str: | |
| unions = [] | |
| for run in runs: | |
| if run['parent_run_ids'] == []: | |
| continue | |
| unions.extend([f"({parent_id}, 'parent of', {run['id']})" for parent_id in run['parent_run_ids']]) | |
| return '\n'.join([f"({union})" for union in unions]) | |
| def get_annotated_run(id2run: dict, run: dict) -> str: | |
| current_run = json.dumps(id2run[run['id']], indent=2) | |
| new_texts = [] | |
| for i, text in enumerate(current_run.split('\n')): | |
| new_texts.append(f"<L{i+1}> {text.strip()}") | |
| annotated_current_run = '\n'.join(new_texts) | |
| return current_run, annotated_current_run | |
| def get_parents(id2run: dict, run: dict) -> str: | |
| return f"""{"\n".join([json.dumps(id2run[parent_id], indent=2) for parent_id in run['parent_run_ids']])}""" | |
| system_prompt = f""" | |
| You are a Agent Trace Graph Analyzer Expert. Your goal is to analyze a Knowledge Graph from a given trace of a LLM agent. | |
| """ | |
| agent = Agent( | |
| model=OpenAIModel("gpt-4.1-mini", provider=azure_provider), | |
| system_prompt=system_prompt, | |
| retries=5, | |
| ) | |
| async def extract_knowledge_graph(runs: List[dict]) -> KnowledgeGraph: | |
| id2run = {run['id']:run for run in runs} | |
| runs_info = get_runs_summary(runs) | |
| runs_relations = get_runs_relations(runs) | |
| knowledge_graph = None | |
| grouped_runs = [runs[0]] + group_chain_runs(runs[1:]) | |
| with open("grouped_runs.json", "w") as f: | |
| json.dump(grouped_runs, f, indent=4) | |
| run_mapping = {} | |
| for i,run in enumerate(grouped_runs): | |
| logger.info(f"Run: {i}/{len(grouped_runs)}") | |
| knowledge_graph = await update_knowledge_graph_from_trace(knowledge_graph, id2run, runs_info, runs_relations, run_mapping, run) | |
| return get_formatted_kg(knowledge_graph, run_mapping) | |
| async def fix_knowledge_graph(kg, message_history, errors): | |
| user_prompt = f""" | |
| ## Knowledge Graph: | |
| {kg.model_dump_json(indent=2)} | |
| ## Invalid entities and relations: | |
| {'\n'.join(errors)} | |
| Instruction: | |
| - Source and target entity ids must exist in the Knowledge Graph (list of entity ids). | |
| - Write the fixed Knowledge Graph in the same format as the previous one. | |
| """ | |
| response = await agent.run(user_prompt, message_history=message_history, model_settings={'temperature': 0.0, 'seed': 42}, output_type=KnowledgeGraph) | |
| return response.output | |
| async def update_knowledge_graph_from_trace(knowledge_graph, id2run, runs_info, runs_relations, run_mapping, run): | |
| parent_runs = get_parents(id2run, run) | |
| current_run, annotated_current_run = get_annotated_run(id2run, run) | |
| run_mapping[run['id']] = (current_run, annotated_current_run) | |
| task_description = f""" | |
| ## Entities: | |
| Agent: An intelligent system that interacts with the Input, performs Tasks, calls a Language Model (LLM) to generate text when needed, and uses Tools to complete Tasks. | |
| Input: The query or data provided to the Agent to initiate the workflow. | |
| Output: The final result generated by the Agent after completing the Task. | |
| Task: The specific action or process assigned to the Agent based on the Input. | |
| Tool: External resources or services accessed by the Agent to assist in performing the Task. | |
| Human: The recipient of the Output who may also intervene in the Task. | |
| ## Relationships: | |
| CONSUMED_BY: The Input is received and processed by the Agent. | |
| PERFORMS: The Agent executes the Task based on the Input. | |
| USES: The Agent accesses Tools as needed to complete the Task. | |
| PRODUCES: The Task, once completed by the Agent, generates the Output. | |
| DELIVERS_TO: The Output is provided to the Human. | |
| INTERVENES: The Human or Agent may intervene in the Task to guide or modify its execution. | |
| ## Task: | |
| Create/Update the Knowledge Graph based on the following information. If the Knowledge Graph already exists, you can update the components or add new components (entities or relations) to the graph. | |
| ## Instructions: | |
| - Agent entities are capable to invoke runs with run_type=llm to generate text. Calling the llm is not a tool. | |
| - When a Run with run_type=tool is a child of another Run with run_type=tool, both belong to the same Tool entity. Treat them as a single Tool—update the existing Tool entity's properties (e.g., append details from the child run to the parent's description or references) and do not create a new Tool entity. This is because child tool runs are often internal implementation details of the parent tool, and the Agent or user interacts only with the parent tool, not directly with the child. | |
| - If the entity already exists, you can append more references for the entity in the raw_prompt_ref field. | |
| - Always check the hierarchy in Runs and Runs Relations to identify parent-child relationships before creating or updating entities. | |
| - You will return always the updated system name and summary. | |
| - Return only new or updated entities and relations. Ignore entities and relations that are not new or updated. | |
| ## Runs: | |
| {runs_info} | |
| ## Runs Relations: | |
| {runs_relations} | |
| ## Previous Knowledge Graph: | |
| {knowledge_graph.model_dump_json(indent=2) if knowledge_graph else "No knowledge graph yet"} | |
| ## Parents: | |
| {parent_runs if parent_runs else "No parents"} | |
| ## Current run: | |
| {annotated_current_run} | |
| """ | |
| # Reasoning | |
| class FormattedOutput(BaseModel): | |
| thoughts: str = Field(description="Your detailed step-by-step reasoning. Keep the response concise, under 50 tokens. Use evidence-based reasoning") | |
| knowledge_graph: KnowledgeGraph = Field(description="The Knowledge Graph to be updated based on the reasoning") | |
| response = await agent.run(task_description, model_settings={'temperature': 0.0, 'seed': 42, }, output_type=FormattedOutput) | |
| knowledge_graph_updates = response.output.knowledge_graph | |
| logger.debug(f"Raw response: {response.output.thoughts}") | |
| updated_knowledge_graph = update_knowledge_graph(knowledge_graph, knowledge_graph_updates) if knowledge_graph else knowledge_graph_updates | |
| # Validation | |
| errors = validate_knowledge_graph(updated_knowledge_graph) | |
| if len(errors) > 0: | |
| logger.info(f"Validation errors: {errors}") | |
| chat_history = response.new_messages() | |
| return await fix_knowledge_graph(updated_knowledge_graph, chat_history, errors) | |
| return updated_knowledge_graph | |
| def equal_content_reference(obj1, obj2): | |
| return obj1.run_id == obj2.run_id and obj1.line_start == obj2.line_start and obj1.line_end == obj2.line_end | |
| def update_knowledge_graph(knowledge_graph: KnowledgeGraph, knowledge_graph_updates: KnowledgeGraph) -> KnowledgeGraph: | |
| entities_ids = [e.id for e in knowledge_graph.entities] | |
| new_entities = {e.id:e for e in knowledge_graph_updates.entities if e.id not in entities_ids} | |
| updated_entities = {e.id:e for e in knowledge_graph_updates.entities if e.id in entities_ids} | |
| updated_entities_list = [] | |
| for entity in knowledge_graph.entities: | |
| if entity.id in updated_entities: | |
| new_raw_prompt_ref = [] | |
| for ref in updated_entities[entity.id].raw_prompt_ref: | |
| if any(equal_content_reference(ref, r) for r in entity.raw_prompt_ref): | |
| continue | |
| new_raw_prompt_ref.append(ref) | |
| updated_entities_list.append(entity.model_copy( | |
| update={ | |
| "importance": updated_entities[entity.id].importance, | |
| "description": updated_entities[entity.id].description, | |
| "raw_prompt_ref": entity.raw_prompt_ref + new_raw_prompt_ref, | |
| } | |
| )) | |
| elif entity.id in new_entities: | |
| updated_entities_list.append(new_entities[entity.id]) | |
| else: | |
| updated_entities_list.append(entity) | |
| relations_ids = [r.id for r in knowledge_graph.relations] | |
| new_relations = {r.id:r for r in knowledge_graph_updates.relations if r.id not in relations_ids} | |
| updated_relations = {r.id:r for r in knowledge_graph_updates.relations if r.id in relations_ids} | |
| updated_relations_list = [] | |
| for relation in knowledge_graph.relations: | |
| if relation.id in updated_relations: | |
| updated_relation = updated_relations[relation.id] | |
| new_interaction_prompt_ref = [] | |
| for ref in updated_relation.interaction_prompt_ref: | |
| if any(equal_content_reference(ref, r) for r in relation.interaction_prompt_ref): | |
| continue | |
| new_interaction_prompt_ref.append(ref) | |
| updated_interaction_prompt_ref = relation.interaction_prompt_ref + new_interaction_prompt_ref | |
| updated_relations_list.append(relation.model_copy( | |
| update={ | |
| "description": updated_relations[relation.id].description, | |
| "importance": updated_relations[relation.id].importance, | |
| "interaction_prompt_ref": updated_interaction_prompt_ref | |
| } | |
| )) | |
| elif relation.id in new_relations: | |
| updated_relations_list.append(new_relations[relation.id]) | |
| else: | |
| updated_relations_list.append(relation) | |
| return knowledge_graph.model_copy( | |
| update={ | |
| "system_name": knowledge_graph_updates.system_name if knowledge_graph_updates.system_name else knowledge_graph.system_name, | |
| "system_summary": knowledge_graph_updates.system_summary if knowledge_graph_updates.system_summary else knowledge_graph.system_summary, | |
| "entities": updated_entities_list, | |
| "relations": updated_relations_list, | |
| } | |
| ) | |
| def validate_knowledge_graph(kg: KnowledgeGraph) -> List[str]: | |
| ents = [entity.id for entity in kg.entities] | |
| invalid_sources =[(r.id, r.source) for r in kg.relations if r.source not in ents] | |
| invalid_targets =[(r.id, r.target) for r in kg.relations if r.target not in ents] | |
| error_descriptions = [] | |
| if len(invalid_sources) > 0: | |
| for r in invalid_sources: | |
| description = f"Invalid Relation {r[0]} with source entity{r[1]}" | |
| error_descriptions.append(description) | |
| if len(invalid_targets) > 0: | |
| for r in invalid_targets: | |
| description = f"Invalid Relation {r[0]} with target entity{r[1]}" | |
| error_descriptions.append(description) | |
| return error_descriptions | |
| def group_chain_runs(runs: List[dict]) -> List[dict]: | |
| """ | |
| Groups chain runs with their children to optimize processing. | |
| Returns a list where chain runs contain all their child runs grouped together, | |
| while individual runs (llm, standalone tools) remain separate. | |
| Args: | |
| runs: List of run dictionaries with fields like id, run_type, parent_run_ids, etc. | |
| Returns: | |
| List of grouped runs where chain runs include their children | |
| """ | |
| # Create mappings for efficient lookup | |
| id_to_run = {run['id']: run for run in runs} | |
| children_by_parent = {} | |
| # Build parent-child relationships | |
| for run in runs: | |
| parent_ids = run.get('parent_run_ids', []) | |
| for parent_id in parent_ids: | |
| if parent_id not in children_by_parent: | |
| children_by_parent[parent_id] = [] | |
| children_by_parent[parent_id].append(run) | |
| # Find runs that should be grouped (chains and complex tools) | |
| groupable_types = {'chain', 'tool'} # tool can also have complex nested operations | |
| processed_run_ids = set() | |
| grouped_runs = [] | |
| def collect_all_children(run_id: str) -> List[dict]: | |
| """Recursively collect all children of a run""" | |
| children = [] | |
| if run_id in children_by_parent: | |
| for child in children_by_parent[run_id]: | |
| children.append(child) | |
| # Recursively collect grandchildren | |
| children.extend(collect_all_children(child['id'])) | |
| return children | |
| # Process runs to create groups | |
| for run in runs: | |
| run_id = run['id'] | |
| # Skip if already processed as part of another group | |
| if run_id in processed_run_ids: | |
| continue | |
| run_type = run.get('run_type', '').lower() | |
| # Group chain runs and complex tool runs with their children | |
| if run_type in groupable_types and run_id in children_by_parent: | |
| # This run has children, so group them together | |
| all_children = collect_all_children(run_id) | |
| # Mark all children as processed | |
| for child in all_children: | |
| processed_run_ids.add(child['id']) | |
| # Create grouped run | |
| grouped_run = run.copy() | |
| grouped_run['grouped_children'] = all_children | |
| grouped_run['total_grouped_runs'] = len(all_children) + 1 # +1 for parent | |
| grouped_runs.append(grouped_run) | |
| processed_run_ids.add(run_id) | |
| else: | |
| # Keep individual runs (llm, standalone tools, chains without children) | |
| # Only add if not already processed as someone's child | |
| if run_id not in processed_run_ids: | |
| grouped_runs.append(run) | |
| processed_run_ids.add(run_id) | |
| return grouped_runs | |