Spaces:
Running
Running
File size: 14,847 Bytes
c2ea5ed |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 |
from langsmith_agent_graph.component_types import KnowledgeGraph
from agentgraph.shared.component_types import KnowledgeGraph as KG , Entity, Relation
from pydantic_ai import Agent
from pydantic_ai.models.openai import OpenAIModel
from typing import List
import json
from core.azure import azure_provider
from core.logger import setup_logger
from pydantic import BaseModel, Field
logger = setup_logger(__name__)
def get_formatted_kg(kg: KnowledgeGraph, content_mapping: dict) -> KG:
return KG(
entities=[Entity(
id = entity.id,
entity_type=entity.type,
name=entity.name,
description=entity.description,
raw_prompt='\n'.join([content_mapping[ref.run_id][0][ref.line_start:ref.line_end+1] for ref in entity.raw_prompt_ref if ref.run_id in content_mapping]),
) for entity in kg.entities],
relations=[Relation(
id = relation.id,
description=relation.description,
source=relation.source,
target=relation.target,
type=relation.type,
interaction_prompt='\n'.join([content_mapping[ref.run_id][0][ref.line_start:ref.line_end+1] for ref in relation.interaction_prompt_ref if ref.run_id in content_mapping]),
) for relation in kg.relations],
system_name=kg.system_name,
system_summary=kg.system_summary,
)
def get_runs_summary(runs: List[dict]) -> str:
return json.dumps([{
'id': run['id'],
'name': run['name'],
'run_type': run['run_type'],
'start_time': run['start_time'],
'end_time': run['end_time'],
} for run in runs], indent=2)
def get_runs_relations(runs: List[dict]) -> str:
unions = []
for run in runs:
if run['parent_run_ids'] == []:
continue
unions.extend([f"({parent_id}, 'parent of', {run['id']})" for parent_id in run['parent_run_ids']])
return '\n'.join([f"({union})" for union in unions])
def get_annotated_run(id2run: dict, run: dict) -> str:
current_run = json.dumps(id2run[run['id']], indent=2)
new_texts = []
for i, text in enumerate(current_run.split('\n')):
new_texts.append(f"<L{i+1}> {text.strip()}")
annotated_current_run = '\n'.join(new_texts)
return current_run, annotated_current_run
def get_parents(id2run: dict, run: dict) -> str:
return f"""{"\n".join([json.dumps(id2run[parent_id], indent=2) for parent_id in run['parent_run_ids']])}"""
system_prompt = f"""
You are a Agent Trace Graph Analyzer Expert. Your goal is to analyze a Knowledge Graph from a given trace of a LLM agent.
"""
agent = Agent(
model=OpenAIModel("gpt-4.1-mini", provider=azure_provider),
system_prompt=system_prompt,
retries=5,
)
async def extract_knowledge_graph(runs: List[dict]) -> KnowledgeGraph:
id2run = {run['id']:run for run in runs}
runs_info = get_runs_summary(runs)
runs_relations = get_runs_relations(runs)
knowledge_graph = None
grouped_runs = [runs[0]] + group_chain_runs(runs[1:])
with open("grouped_runs.json", "w") as f:
json.dump(grouped_runs, f, indent=4)
run_mapping = {}
for i,run in enumerate(grouped_runs):
logger.info(f"Run: {i}/{len(grouped_runs)}")
knowledge_graph = await update_knowledge_graph_from_trace(knowledge_graph, id2run, runs_info, runs_relations, run_mapping, run)
return get_formatted_kg(knowledge_graph, run_mapping)
async def fix_knowledge_graph(kg, message_history, errors):
user_prompt = f"""
## Knowledge Graph:
{kg.model_dump_json(indent=2)}
## Invalid entities and relations:
{'\n'.join(errors)}
Instruction:
- Source and target entity ids must exist in the Knowledge Graph (list of entity ids).
- Write the fixed Knowledge Graph in the same format as the previous one.
"""
response = await agent.run(user_prompt, message_history=message_history, model_settings={'temperature': 0.0, 'seed': 42}, output_type=KnowledgeGraph)
return response.output
async def update_knowledge_graph_from_trace(knowledge_graph, id2run, runs_info, runs_relations, run_mapping, run):
parent_runs = get_parents(id2run, run)
current_run, annotated_current_run = get_annotated_run(id2run, run)
run_mapping[run['id']] = (current_run, annotated_current_run)
task_description = f"""
## Entities:
Agent: An intelligent system that interacts with the Input, performs Tasks, calls a Language Model (LLM) to generate text when needed, and uses Tools to complete Tasks.
Input: The query or data provided to the Agent to initiate the workflow.
Output: The final result generated by the Agent after completing the Task.
Task: The specific action or process assigned to the Agent based on the Input.
Tool: External resources or services accessed by the Agent to assist in performing the Task.
Human: The recipient of the Output who may also intervene in the Task.
## Relationships:
CONSUMED_BY: The Input is received and processed by the Agent.
PERFORMS: The Agent executes the Task based on the Input.
USES: The Agent accesses Tools as needed to complete the Task.
PRODUCES: The Task, once completed by the Agent, generates the Output.
DELIVERS_TO: The Output is provided to the Human.
INTERVENES: The Human or Agent may intervene in the Task to guide or modify its execution.
## Task:
Create/Update the Knowledge Graph based on the following information. If the Knowledge Graph already exists, you can update the components or add new components (entities or relations) to the graph.
## Instructions:
- Agent entities are capable to invoke runs with run_type=llm to generate text. Calling the llm is not a tool.
- When a Run with run_type=tool is a child of another Run with run_type=tool, both belong to the same Tool entity. Treat them as a single Tool—update the existing Tool entity's properties (e.g., append details from the child run to the parent's description or references) and do not create a new Tool entity. This is because child tool runs are often internal implementation details of the parent tool, and the Agent or user interacts only with the parent tool, not directly with the child.
- If the entity already exists, you can append more references for the entity in the raw_prompt_ref field.
- Always check the hierarchy in Runs and Runs Relations to identify parent-child relationships before creating or updating entities.
- You will return always the updated system name and summary.
- Return only new or updated entities and relations. Ignore entities and relations that are not new or updated.
## Runs:
{runs_info}
## Runs Relations:
{runs_relations}
## Previous Knowledge Graph:
{knowledge_graph.model_dump_json(indent=2) if knowledge_graph else "No knowledge graph yet"}
## Parents:
{parent_runs if parent_runs else "No parents"}
## Current run:
{annotated_current_run}
"""
# Reasoning
class FormattedOutput(BaseModel):
thoughts: str = Field(description="Your detailed step-by-step reasoning. Keep the response concise, under 50 tokens. Use evidence-based reasoning")
knowledge_graph: KnowledgeGraph = Field(description="The Knowledge Graph to be updated based on the reasoning")
response = await agent.run(task_description, model_settings={'temperature': 0.0, 'seed': 42, }, output_type=FormattedOutput)
knowledge_graph_updates = response.output.knowledge_graph
logger.debug(f"Raw response: {response.output.thoughts}")
updated_knowledge_graph = update_knowledge_graph(knowledge_graph, knowledge_graph_updates) if knowledge_graph else knowledge_graph_updates
# Validation
errors = validate_knowledge_graph(updated_knowledge_graph)
if len(errors) > 0:
logger.info(f"Validation errors: {errors}")
chat_history = response.new_messages()
return await fix_knowledge_graph(updated_knowledge_graph, chat_history, errors)
return updated_knowledge_graph
def equal_content_reference(obj1, obj2):
return obj1.run_id == obj2.run_id and obj1.line_start == obj2.line_start and obj1.line_end == obj2.line_end
def update_knowledge_graph(knowledge_graph: KnowledgeGraph, knowledge_graph_updates: KnowledgeGraph) -> KnowledgeGraph:
entities_ids = [e.id for e in knowledge_graph.entities]
new_entities = {e.id:e for e in knowledge_graph_updates.entities if e.id not in entities_ids}
updated_entities = {e.id:e for e in knowledge_graph_updates.entities if e.id in entities_ids}
updated_entities_list = []
for entity in knowledge_graph.entities:
if entity.id in updated_entities:
new_raw_prompt_ref = []
for ref in updated_entities[entity.id].raw_prompt_ref:
if any(equal_content_reference(ref, r) for r in entity.raw_prompt_ref):
continue
new_raw_prompt_ref.append(ref)
updated_entities_list.append(entity.model_copy(
update={
"importance": updated_entities[entity.id].importance,
"description": updated_entities[entity.id].description,
"raw_prompt_ref": entity.raw_prompt_ref + new_raw_prompt_ref,
}
))
elif entity.id in new_entities:
updated_entities_list.append(new_entities[entity.id])
else:
updated_entities_list.append(entity)
relations_ids = [r.id for r in knowledge_graph.relations]
new_relations = {r.id:r for r in knowledge_graph_updates.relations if r.id not in relations_ids}
updated_relations = {r.id:r for r in knowledge_graph_updates.relations if r.id in relations_ids}
updated_relations_list = []
for relation in knowledge_graph.relations:
if relation.id in updated_relations:
updated_relation = updated_relations[relation.id]
new_interaction_prompt_ref = []
for ref in updated_relation.interaction_prompt_ref:
if any(equal_content_reference(ref, r) for r in relation.interaction_prompt_ref):
continue
new_interaction_prompt_ref.append(ref)
updated_interaction_prompt_ref = relation.interaction_prompt_ref + new_interaction_prompt_ref
updated_relations_list.append(relation.model_copy(
update={
"description": updated_relations[relation.id].description,
"importance": updated_relations[relation.id].importance,
"interaction_prompt_ref": updated_interaction_prompt_ref
}
))
elif relation.id in new_relations:
updated_relations_list.append(new_relations[relation.id])
else:
updated_relations_list.append(relation)
return knowledge_graph.model_copy(
update={
"system_name": knowledge_graph_updates.system_name if knowledge_graph_updates.system_name else knowledge_graph.system_name,
"system_summary": knowledge_graph_updates.system_summary if knowledge_graph_updates.system_summary else knowledge_graph.system_summary,
"entities": updated_entities_list,
"relations": updated_relations_list,
}
)
def validate_knowledge_graph(kg: KnowledgeGraph) -> List[str]:
ents = [entity.id for entity in kg.entities]
invalid_sources =[(r.id, r.source) for r in kg.relations if r.source not in ents]
invalid_targets =[(r.id, r.target) for r in kg.relations if r.target not in ents]
error_descriptions = []
if len(invalid_sources) > 0:
for r in invalid_sources:
description = f"Invalid Relation {r[0]} with source entity{r[1]}"
error_descriptions.append(description)
if len(invalid_targets) > 0:
for r in invalid_targets:
description = f"Invalid Relation {r[0]} with target entity{r[1]}"
error_descriptions.append(description)
return error_descriptions
def group_chain_runs(runs: List[dict]) -> List[dict]:
"""
Groups chain runs with their children to optimize processing.
Returns a list where chain runs contain all their child runs grouped together,
while individual runs (llm, standalone tools) remain separate.
Args:
runs: List of run dictionaries with fields like id, run_type, parent_run_ids, etc.
Returns:
List of grouped runs where chain runs include their children
"""
# Create mappings for efficient lookup
id_to_run = {run['id']: run for run in runs}
children_by_parent = {}
# Build parent-child relationships
for run in runs:
parent_ids = run.get('parent_run_ids', [])
for parent_id in parent_ids:
if parent_id not in children_by_parent:
children_by_parent[parent_id] = []
children_by_parent[parent_id].append(run)
# Find runs that should be grouped (chains and complex tools)
groupable_types = {'chain', 'tool'} # tool can also have complex nested operations
processed_run_ids = set()
grouped_runs = []
def collect_all_children(run_id: str) -> List[dict]:
"""Recursively collect all children of a run"""
children = []
if run_id in children_by_parent:
for child in children_by_parent[run_id]:
children.append(child)
# Recursively collect grandchildren
children.extend(collect_all_children(child['id']))
return children
# Process runs to create groups
for run in runs:
run_id = run['id']
# Skip if already processed as part of another group
if run_id in processed_run_ids:
continue
run_type = run.get('run_type', '').lower()
# Group chain runs and complex tool runs with their children
if run_type in groupable_types and run_id in children_by_parent:
# This run has children, so group them together
all_children = collect_all_children(run_id)
# Mark all children as processed
for child in all_children:
processed_run_ids.add(child['id'])
# Create grouped run
grouped_run = run.copy()
grouped_run['grouped_children'] = all_children
grouped_run['total_grouped_runs'] = len(all_children) + 1 # +1 for parent
grouped_runs.append(grouped_run)
processed_run_ids.add(run_id)
else:
# Keep individual runs (llm, standalone tools, chains without children)
# Only add if not already processed as someone's child
if run_id not in processed_run_ids:
grouped_runs.append(run)
processed_run_ids.add(run_id)
return grouped_runs
|