File size: 14,847 Bytes
c2ea5ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
from langsmith_agent_graph.component_types import KnowledgeGraph
from agentgraph.shared.component_types import KnowledgeGraph as KG , Entity, Relation
from pydantic_ai import Agent
from pydantic_ai.models.openai import OpenAIModel
from typing import List
import json
from core.azure import azure_provider  
from core.logger import setup_logger
from pydantic import BaseModel, Field

logger = setup_logger(__name__)

def get_formatted_kg(kg: KnowledgeGraph, content_mapping: dict) -> KG:
    return KG(
        entities=[Entity(
            id = entity.id,
            entity_type=entity.type,
            name=entity.name,
            description=entity.description,
            raw_prompt='\n'.join([content_mapping[ref.run_id][0][ref.line_start:ref.line_end+1] for ref in entity.raw_prompt_ref if ref.run_id in content_mapping]),
        ) for entity in kg.entities],
        relations=[Relation(
            id = relation.id,
            description=relation.description,
            source=relation.source,
            target=relation.target,
            type=relation.type,
            interaction_prompt='\n'.join([content_mapping[ref.run_id][0][ref.line_start:ref.line_end+1] for ref in relation.interaction_prompt_ref if ref.run_id in content_mapping]),
        ) for relation in kg.relations],
        system_name=kg.system_name,
        system_summary=kg.system_summary,
    )

def get_runs_summary(runs: List[dict]) -> str:
    return json.dumps([{
        'id': run['id'],
        'name': run['name'],
        'run_type': run['run_type'],
        'start_time': run['start_time'],
        'end_time': run['end_time'],
    } for run in runs], indent=2)

def get_runs_relations(runs: List[dict]) -> str:
    unions = []
    for run in runs:
        if run['parent_run_ids'] == []:
            continue
        unions.extend([f"({parent_id}, 'parent of', {run['id']})" for parent_id in run['parent_run_ids']])

    return '\n'.join([f"({union})" for union in unions])


def get_annotated_run(id2run: dict, run: dict) -> str:
    current_run = json.dumps(id2run[run['id']], indent=2)
    new_texts = []
    for i, text in enumerate(current_run.split('\n')):
        new_texts.append(f"<L{i+1}>  {text.strip()}")
    annotated_current_run = '\n'.join(new_texts)
    return current_run, annotated_current_run


def get_parents(id2run: dict, run: dict) -> str:
    return f"""{"\n".join([json.dumps(id2run[parent_id], indent=2) for parent_id in run['parent_run_ids']])}"""


system_prompt = f"""
You are a Agent Trace Graph Analyzer Expert. Your goal is to analyze a Knowledge Graph from a given trace of a LLM agent.
"""

agent = Agent(
    model=OpenAIModel("gpt-4.1-mini", provider=azure_provider),
    system_prompt=system_prompt,
    retries=5,
)

async def extract_knowledge_graph(runs: List[dict]) -> KnowledgeGraph:

    id2run = {run['id']:run for run in runs}
    runs_info = get_runs_summary(runs)
    runs_relations = get_runs_relations(runs)
    knowledge_graph = None

    grouped_runs = [runs[0]] + group_chain_runs(runs[1:])

    with open("grouped_runs.json", "w") as f:
        json.dump(grouped_runs, f, indent=4)

    run_mapping = {}
    for i,run in enumerate(grouped_runs):
        logger.info(f"Run: {i}/{len(grouped_runs)}")
        knowledge_graph = await update_knowledge_graph_from_trace(knowledge_graph, id2run, runs_info, runs_relations, run_mapping, run)
        
    return get_formatted_kg(knowledge_graph, run_mapping)

async def fix_knowledge_graph(kg, message_history, errors):
    user_prompt = f"""
## Knowledge Graph:
{kg.model_dump_json(indent=2)}

## Invalid entities and relations:
{'\n'.join(errors)}

Instruction: 
- Source and target entity ids must exist in the Knowledge Graph (list of entity ids).
- Write the fixed Knowledge Graph in the same format as the previous one.
"""

    response = await agent.run(user_prompt, message_history=message_history, model_settings={'temperature': 0.0, 'seed': 42}, output_type=KnowledgeGraph)
    return response.output

async def update_knowledge_graph_from_trace(knowledge_graph, id2run, runs_info, runs_relations, run_mapping, run):
    parent_runs = get_parents(id2run, run)
    current_run, annotated_current_run = get_annotated_run(id2run, run)
    run_mapping[run['id']] = (current_run, annotated_current_run)
    
    task_description = f"""
## Entities:
Agent: An intelligent system that interacts with the Input, performs Tasks, calls a Language Model (LLM) to generate text when needed, and uses Tools to complete Tasks.
Input: The query or data provided to the Agent to initiate the workflow.
Output: The final result generated by the Agent after completing the Task.
Task: The specific action or process assigned to the Agent based on the Input.
Tool: External resources or services accessed by the Agent to assist in performing the Task. 
Human: The recipient of the Output who may also intervene in the Task.

## Relationships:
CONSUMED_BY: The Input is received and processed by the Agent.
PERFORMS: The Agent executes the Task based on the Input.
USES: The Agent accesses Tools as needed to complete the Task.
PRODUCES: The Task, once completed by the Agent, generates the Output.
DELIVERS_TO: The Output is provided to the Human.
INTERVENES: The Human or Agent may intervene in the Task to guide or modify its execution.

## Task:
Create/Update the Knowledge Graph based on the following information. If the Knowledge Graph already exists, you can update the components or add new components (entities or relations) to the graph.


## Instructions:
- Agent entities are capable to invoke runs with run_type=llm to generate text. Calling the llm is not a tool.
- When a Run with run_type=tool is a child of another Run with run_type=tool, both belong to the same Tool entity. Treat them as a single Tool—update the existing Tool entity's properties (e.g., append details from the child run to the parent's description or references) and do not create a new Tool entity. This is because child tool runs are often internal implementation details of the parent tool, and the Agent or user interacts only with the parent tool, not directly with the child. 
- If the entity already exists, you can append more references for the entity in the raw_prompt_ref field.
- Always check the hierarchy in Runs and Runs Relations to identify parent-child relationships before creating or updating entities.
- You will return always the updated system name and summary. 
- Return only new or updated entities and relations. Ignore entities and relations that are not new or updated.

## Runs:
{runs_info}
## Runs Relations:
{runs_relations}
## Previous Knowledge Graph:
{knowledge_graph.model_dump_json(indent=2) if knowledge_graph else "No knowledge graph yet"}
## Parents:
{parent_runs if parent_runs else "No parents"}
## Current run:
{annotated_current_run}
"""  
    # Reasoning
    class FormattedOutput(BaseModel):
        thoughts: str = Field(description="Your detailed step-by-step reasoning. Keep the response concise, under 50 tokens. Use evidence-based reasoning")
        knowledge_graph: KnowledgeGraph = Field(description="The Knowledge Graph to be updated based on the reasoning")

    response = await agent.run(task_description, model_settings={'temperature': 0.0, 'seed': 42, }, output_type=FormattedOutput)
    knowledge_graph_updates = response.output.knowledge_graph
    logger.debug(f"Raw response: {response.output.thoughts}")

    updated_knowledge_graph = update_knowledge_graph(knowledge_graph, knowledge_graph_updates) if knowledge_graph else knowledge_graph_updates

    # Validation
    errors = validate_knowledge_graph(updated_knowledge_graph)
    if len(errors) > 0:
        logger.info(f"Validation errors: {errors}")
        chat_history = response.new_messages()
        return await fix_knowledge_graph(updated_knowledge_graph, chat_history, errors)
    return updated_knowledge_graph

def equal_content_reference(obj1, obj2):
    return obj1.run_id == obj2.run_id and obj1.line_start == obj2.line_start and obj1.line_end == obj2.line_end

def update_knowledge_graph(knowledge_graph: KnowledgeGraph, knowledge_graph_updates: KnowledgeGraph) -> KnowledgeGraph:
    entities_ids = [e.id for e in knowledge_graph.entities]
    new_entities = {e.id:e for e in knowledge_graph_updates.entities if e.id not in entities_ids}
    updated_entities = {e.id:e for e in knowledge_graph_updates.entities if e.id in entities_ids}

    updated_entities_list = []
    for entity in knowledge_graph.entities:
        if entity.id in updated_entities:
            
            new_raw_prompt_ref = []
            for ref in updated_entities[entity.id].raw_prompt_ref:
                if any(equal_content_reference(ref, r) for r in entity.raw_prompt_ref):
                    continue
                new_raw_prompt_ref.append(ref)

            updated_entities_list.append(entity.model_copy(
                update={
                    "importance": updated_entities[entity.id].importance,
                    "description": updated_entities[entity.id].description,
                    "raw_prompt_ref": entity.raw_prompt_ref + new_raw_prompt_ref,
                }
            ))
        elif entity.id in new_entities:
            updated_entities_list.append(new_entities[entity.id])
        else:
            updated_entities_list.append(entity)

    relations_ids = [r.id for r in knowledge_graph.relations]
    new_relations = {r.id:r for r in knowledge_graph_updates.relations if r.id not in relations_ids}
    updated_relations = {r.id:r for r in knowledge_graph_updates.relations if r.id in relations_ids}
    

    updated_relations_list = []
    for relation in knowledge_graph.relations:
        if relation.id in updated_relations:
            updated_relation = updated_relations[relation.id]

            new_interaction_prompt_ref = []
            for ref in updated_relation.interaction_prompt_ref:
                if any(equal_content_reference(ref, r) for r in relation.interaction_prompt_ref):
                    continue
                new_interaction_prompt_ref.append(ref)
            updated_interaction_prompt_ref = relation.interaction_prompt_ref + new_interaction_prompt_ref

            updated_relations_list.append(relation.model_copy(
                update={
                    "description": updated_relations[relation.id].description,
                    "importance": updated_relations[relation.id].importance,
                    "interaction_prompt_ref": updated_interaction_prompt_ref
                }
            ))
        elif relation.id in new_relations:
            updated_relations_list.append(new_relations[relation.id])
        else:
            updated_relations_list.append(relation)

    return knowledge_graph.model_copy(
        update={
            "system_name": knowledge_graph_updates.system_name if knowledge_graph_updates.system_name else knowledge_graph.system_name,
            "system_summary": knowledge_graph_updates.system_summary if knowledge_graph_updates.system_summary else knowledge_graph.system_summary,
            "entities": updated_entities_list,
            "relations": updated_relations_list,
        }
    )

def validate_knowledge_graph(kg: KnowledgeGraph) -> List[str]:
    ents = [entity.id for entity in kg.entities]
    invalid_sources =[(r.id, r.source) for r in kg.relations if r.source not in ents]
    invalid_targets =[(r.id, r.target) for r in kg.relations if r.target not in ents]
    
    error_descriptions = []
    if len(invalid_sources) > 0:
        for r in invalid_sources:
            description = f"Invalid Relation {r[0]} with source entity{r[1]}"
            error_descriptions.append(description)

    if len(invalid_targets) > 0:
        for r in invalid_targets:
            description = f"Invalid Relation {r[0]} with target entity{r[1]}"
            error_descriptions.append(description)

    return error_descriptions

def group_chain_runs(runs: List[dict]) -> List[dict]:
    """
    Groups chain runs with their children to optimize processing.
    Returns a list where chain runs contain all their child runs grouped together,
    while individual runs (llm, standalone tools) remain separate.
    
    Args:
        runs: List of run dictionaries with fields like id, run_type, parent_run_ids, etc.
        
    Returns:
        List of grouped runs where chain runs include their children
    """
    # Create mappings for efficient lookup
    id_to_run = {run['id']: run for run in runs}
    children_by_parent = {}
    
    # Build parent-child relationships
    for run in runs:
        parent_ids = run.get('parent_run_ids', [])
        for parent_id in parent_ids:
            if parent_id not in children_by_parent:
                children_by_parent[parent_id] = []
            children_by_parent[parent_id].append(run)
    
    # Find runs that should be grouped (chains and complex tools)
    groupable_types = {'chain', 'tool'}  # tool can also have complex nested operations
    processed_run_ids = set()
    grouped_runs = []
    
    def collect_all_children(run_id: str) -> List[dict]:
        """Recursively collect all children of a run"""
        children = []
        if run_id in children_by_parent:
            for child in children_by_parent[run_id]:
                children.append(child)
                # Recursively collect grandchildren
                children.extend(collect_all_children(child['id']))
        return children
    
    # Process runs to create groups
    for run in runs:
        run_id = run['id']
        
        # Skip if already processed as part of another group
        if run_id in processed_run_ids:
            continue
            
        run_type = run.get('run_type', '').lower()
        
        # Group chain runs and complex tool runs with their children
        if run_type in groupable_types and run_id in children_by_parent:
            # This run has children, so group them together
            all_children = collect_all_children(run_id)
            
            # Mark all children as processed
            for child in all_children:
                processed_run_ids.add(child['id'])
            
            # Create grouped run
            grouped_run = run.copy()
            grouped_run['grouped_children'] = all_children
            grouped_run['total_grouped_runs'] = len(all_children) + 1  # +1 for parent
            grouped_runs.append(grouped_run)
            processed_run_ids.add(run_id)
            
        else:
            # Keep individual runs (llm, standalone tools, chains without children)
            # Only add if not already processed as someone's child
            if run_id not in processed_run_ids:
                grouped_runs.append(run)
                processed_run_ids.add(run_id)
    
    return grouped_runs