wu981526092's picture
🚀 Deploy AgentGraph: Complete agent monitoring and knowledge graph system
c2ea5ed
#!/usr/bin/env python3
import argparse
import json
import re
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from agentgraph.shared.models.platform_models.langsmith import (
LangFuseObservation,
LangFuseSession,
LangFuseTrace,
LangSmithRun,
LangSmithTrace,
)
def is_empty_value(value: Any) -> bool:
"""Check if value is empty (null, empty array, empty string)."""
return (
value is None or
value == [] or
value == {} or
value == "" or
(isinstance(value, str) and value.strip() == "")
)
def try_parse_json(text: str) -> Any:
if text.strip().startswith(("{", "[")):
try:
return json.loads(text)
except json.JSONDecodeError:
return None
return None
def filter_empty_values(obj: Any) -> Any:
if isinstance(obj, dict):
filtered = {}
for k, v in obj.items():
if v is not None and v != [] and v != "":
if isinstance(v, str) and v.strip() == "":
continue
filtered_value = filter_empty_values(v)
if filtered_value is not None and filtered_value != [] and filtered_value != "":
filtered[k] = filtered_value
return filtered
elif isinstance(obj, list):
return [filter_empty_values(item) for item in obj if item is not None and item != [] and item != ""]
else:
return obj
def collect_all_strings(obj: Any, strings = None) -> List[str]:
if strings is None:
strings = []
if isinstance(obj, str):
trimmed = obj.strip()
if len(trimmed) > 50 and 'http' not in trimmed:
strings.append(trimmed)
elif isinstance(obj, dict):
for value in obj.values():
collect_all_strings(value, strings)
elif isinstance(obj, list):
for item in obj:
collect_all_strings(item, strings)
return strings
def count_strings(obj: Any, ignore_keys: Optional[List[str]] = None) -> int:
if not ignore_keys:
ignore_keys = []
if isinstance(obj, str):
return 1
elif isinstance(obj, dict):
return sum(count_strings(v, ignore_keys) for k, v in obj.items() if k not in ignore_keys)
elif isinstance(obj, list):
return sum(count_strings(item, ignore_keys) for item in obj)
else:
return 0
def find_repeated_patterns(obj: Any, topk: int = 10) -> List[Tuple[str, int, int]]:
all_strings = collect_all_strings(obj)
if not all_strings:
print("No strings found")
return []
patterns = {}
for text in all_strings:
patterns[text] = patterns.get(text, 0) + 1
repeated = []
for pattern, count in patterns.items():
if count > 1:
var_name_length = 18
saved_bytes = (len(pattern) - var_name_length) * count
if saved_bytes > 0:
repeated.append((pattern, count, saved_bytes))
if not repeated:
print("No repeated patterns found")
else:
print(f"Found {len(repeated)} repeated patterns")
repeated.sort(key=lambda x: x[2], reverse=True)
return repeated[:topk]
def replace_string_in_obj(obj: Any, pattern: str, replacement: str) -> Any:
if isinstance(obj, str):
if obj.strip() == pattern:
return replacement
return obj
elif isinstance(obj, dict):
return {k: replace_string_in_obj(v, pattern, replacement) for k, v in obj.items()}
elif isinstance(obj, list):
return [replace_string_in_obj(item, pattern, replacement) for item in obj]
else:
return obj
def compress_repeated_strings(obj: Any, topk: int) -> Any:
try:
repeated_patterns = find_repeated_patterns(obj, topk=topk)
if not repeated_patterns:
print("No patterns to compress")
return obj
global_variables = {}
compressed_obj = {}
for i, (pattern, _, _) in enumerate(repeated_patterns):
var_name = f"REPEATED_STRING{i+1}"
global_variables[var_name] = pattern
replacement = f"${{{var_name}}}"
obj = replace_string_in_obj(obj, pattern, replacement)
if not global_variables:
print("No global_variables created")
return obj
compressed_obj["global_variables"] = global_variables
for key in obj.keys():
compressed_obj[key] = obj[key]
print(f"Created {len(global_variables)} global variables")
return compressed_obj
except Exception as e:
print(f"Compression failed: {e}")
return obj
def process_filtered_data(filtered_data: Any, topk: int, maximum: int = 512000) -> Any:
compressed_data = compress_repeated_strings(filtered_data, topk=topk)
return truncate_text(compressed_data, maximum) # maximum 128k tokens * 4 = 512k characters
def truncate_object(obj: Any, max_char: int, ignore_keys: Optional[List[str]] = None) -> Any:
obj = filter_empty_values(obj)
if not ignore_keys:
ignore_keys = []
if isinstance(obj, str):
trimmed = obj.strip()
cleaned = re.sub(r"\n+", "\n", trimmed)
cleaned = re.sub(r"\t+", "\t", cleaned)
if len(cleaned) > max_char:
half_length = (max_char - 3) // 2
first_half = cleaned[:half_length]
second_half = cleaned[-half_length:]
return first_half + "..." + second_half
return cleaned
elif isinstance(obj, dict):
return {k: truncate_object(v, max_char, ignore_keys) for k, v in obj.items() if k not in ignore_keys}
elif isinstance(obj, list):
return [truncate_object(item, max_char, ignore_keys) for item in obj]
else:
return obj
def truncate_text(text: Any, max_length: int, min_char: int = 25, ignore_keys: Optional[List[str]] = None) -> Any:
if isinstance(text, str):
parsed_json = try_parse_json(text)
if parsed_json is not None:
return truncate_text(parsed_json, max_length)
trimmed = text.strip()
cleaned = re.sub(r"\n+", "\n", trimmed)
cleaned = re.sub(r"\t+", "\t", cleaned)
if len(cleaned) > max_length:
half_length = (max_length - 3) // 2
first_half = cleaned[:half_length]
second_half = cleaned[-half_length:]
return first_half + "..." + second_half
return cleaned
elif isinstance(text, (dict, list)):
if isinstance(text, dict) and "global_variables" in text:
result = {}
for k, v in text.items():
if k == "global_variables":
result[k] = truncate_text(v, 5000, min_char)
else:
result[k] = truncate_text(v, max_length, min_char)
return result
filtered_text = filter_empty_values(text)
string_count = count_strings(filtered_text, ignore_keys)
if string_count == 0:
return filtered_text
max_char = max_length // string_count
if max_char < min_char:
max_char = min_char
return truncate_object(filtered_text, max_char, ignore_keys=ignore_keys)
else:
return text
def filter_langfuse_observation(observation: LangFuseObservation, max_char: Optional[int]) -> Dict[str, Any]:
"""Filter Langfuse observation object to keep only required fields."""
filtered = {}
if observation.name:
filtered["name"] = observation.name
if observation.type:
filtered["type"] = observation.type
if observation.input:
filtered["input"] = truncate_text(observation.input, max_char) if max_char is not None else observation.input
if observation.output:
filtered["output"] = truncate_text(observation.output, max_char) if max_char is not None else observation.output
if observation.statusMessage:
filtered["statusMessage"] = observation.statusMessage
if observation.level:
filtered["level"] = observation.level
# if observation.model:
# filtered["model"] = observation.model
# if observation.costDetails:
# filtered["costDetails"] = observation.costDetails
# if observation.usage:
# filtered["usage"] = observation.usage
return filtered
def hierarchy_langfuse_observation(root_obs: LangFuseObservation, observations: List[LangFuseObservation], max_char: Optional[int]) -> Dict[str, Any]:
"""Convert Langfuse observation to hierarchy dict structure."""
obs_dict = { obs.id: obs for obs in observations }
direct_child_obs_ids = [obs.id for obs in observations if obs.parentObservationId == root_obs.id]
filtered = filter_langfuse_observation(root_obs, max_char)
if direct_child_obs_ids:
filtered["child_observations"] = [hierarchy_langfuse_observation(obs_dict[obs_id], observations, max_char) for obs_id in direct_child_obs_ids]
return filtered
def set_hierarchy_depths(node: Dict[str, Any], child_key: str, depth: int = 0) -> int:
"""Set depth for hierarchy structure and return max depth."""
node["depth"] = depth
max_depth = depth
if child_key in node:
for child in node[child_key]:
child_max = set_hierarchy_depths(child, child_key, depth + 1)
max_depth = max(max_depth, child_max)
return max_depth
def filter_langfuse_trace(trace: LangFuseTrace, max_char: Optional[int], hierarchy: bool = False) -> Dict[str, Any]:
"""Filter Langfuse trace object to keep only required fields."""
filtered = {}
if trace.name:
filtered["name"] = trace.name
if trace.observations and not hierarchy:
filtered["observations"] = [filter_langfuse_observation(obs, max_char) for obs in trace.observations]
elif trace.observations and hierarchy:
root_observations = [obs for obs in trace.observations if obs.parentObservationId is None]
filtered["observations"] = [hierarchy_langfuse_observation(root_obs, trace.observations, max_char) for root_obs in root_observations]
max_depth = 0
for obs in filtered["observations"]:
obs_max = set_hierarchy_depths(obs, "child_observations")
max_depth = max(max_depth, obs_max)
filtered["observation_depth"] = max_depth
if trace.totalCost:
filtered["total_cost"] = trace.totalCost
if trace.input:
filtered["input"] = truncate_text(trace.input, max_char) if max_char is not None else trace.input
if trace.output:
filtered["output"] = truncate_text(trace.output, max_char) if max_char is not None else trace.output
return filtered
def filter_langsmith_run(run: LangSmithRun, max_char: Optional[int]) -> Dict[str, Any]:
"""Filter LangSmith trace object to keep only required fields."""
filtered = {}
# if run.name:
# del run.name
ignore_keys = ["additional_kwargs",
"response_metadata",
"iterations",
"usage_metadata",
"id",
"kwargs",
"example",
"llm_output",
"lc",
"metadata",
"generation_info",
"args",
"output",
"outputs",
]
if run.inputs:
filtered["inputs"] = truncate_text(run.inputs, max_char, ignore_keys=ignore_keys) if max_char is not None else run.inputs
if filtered["inputs"] == {}:
del filtered["inputs"]
# if run.outputs:
# filtered["outputs"] = truncate_text(run.outputs, max_char, ignore_keys=ignore_keys) if max_char is not None else run.outputs
# if filtered["outputs"] == {}:
# del filtered["outputs"]
if run.error:
filtered["error"] = truncate_text(run.error, max_char) if max_char is not None else run.error
return filtered
def hierarchy_langsmith_run(root_run: LangSmithRun, runs: List[LangSmithRun], max_char: Optional[int]) -> Dict[str, Any]:
"""Convert LangSmith run to hierarchy dict structure."""
run_dict = { run.id: run for run in runs }
direct_child_run_ids = [run.id for run in runs if run.parent_run_id == root_run.id]
filtered = filter_langsmith_run(root_run, max_char)
if direct_child_run_ids:
filtered["child_runs"] = [hierarchy_langsmith_run(run_dict[run_id], runs, max_char) for run_id in direct_child_run_ids]
return filtered
def filter_langfuse_session(session: LangFuseSession,
max_char: Optional[int],
topk: int,
raw: bool = False,
replace: bool = False,
hierarchy: bool = False,
) -> Dict[str, Any]:
"""Filter Langfuse trace object to keep only required fields."""
filtered = {}
if raw:
return session.model_dump()
else:
if session.session_name:
filtered["name"] = session.session_name
if session.traces:
filtered["traces"] = [filter_langfuse_trace(trace, max_char, hierarchy) for trace in session.traces]
if replace:
return process_filtered_data(filtered, topk)
return filtered
def filter_langsmith_trace(trace: LangSmithTrace,
max_char: Optional[int],
topk: int,
raw: bool = False,
replace: bool = False,
hierarchy: bool = False,
) -> Dict[str, Any]:
"""Filter LangSmith export data to keep only required fields."""
filtered = {}
if raw:
return trace.model_dump()
else:
if trace.trace_name:
filtered["name"] = trace.trace_name
if trace.runs and not hierarchy:
filtered["runs"] = [filter_langsmith_run(run, max_char) for run in trace.runs]
elif trace.runs and hierarchy:
filtered["runs"] = [hierarchy_langsmith_run(trace.runs[0], trace.runs, max_char)]
if filtered["runs"]:
max_depth = set_hierarchy_depths(filtered["runs"][0], "child_runs")
filtered["run_depth"] = max_depth
if replace:
return process_filtered_data(filtered, topk)
return filtered
def detect_json_format(json_file: Path) -> str:
"""Detect if JSON file is Langfuse (session_id) or LangSmith (trace_id) format."""
try:
with open(json_file, 'r') as f:
data = json.load(f)
if 'session_id' in data:
return 'langfuse'
elif 'trace_id' in data:
return 'langsmith'
else:
print(f"Warning: Cannot detect format for {json_file.name}, defaulting to LangSmith")
return 'langsmith'
except Exception as e:
print(f"Error reading {json_file.name}: {e}")
return 'langsmith'
def process_traces_folder(traces_folder: str,
output_folder: str,
max_char: Optional[int],
topk: int,
raw: bool = False,
replace: bool = False,
hierarchy: bool = False,
):
"""Process all trace files in the folder (both JSONL and JSON)."""
traces_path = Path(traces_folder)
if not traces_path.exists():
raise FileNotFoundError(f"Traces folder not found: {traces_folder}")
jsonl_files = list(traces_path.glob("*.jsonl"))
json_files = list(traces_path.glob("*.json"))
if not jsonl_files and not json_files:
print(f"No JSONL or JSON files found in {traces_folder}")
return
langfuse_files = list(jsonl_files)
langsmith_files = []
for json_file in json_files:
format_type = detect_json_format(json_file)
if format_type == 'langfuse':
langfuse_files.append(json_file)
else:
langsmith_files.append(json_file)
output_path = Path(output_folder)
output_path.mkdir(parents=True, exist_ok=True)
for files, is_langfuse in [(langfuse_files, True), (langsmith_files, False)]:
if files:
print(f"Found {len(files)} {'Langfuse' if is_langfuse else 'LangSmith'} files")
for json_file in files:
with open(json_file, "r") as f:
export_data = json.load(f)
if is_langfuse:
result = [filter_langfuse_session(LangFuseSession(**export_data), max_char, topk, raw=raw, replace=replace, hierarchy=hierarchy)]
else:
result = filter_langsmith_trace(LangSmithTrace(**export_data), max_char, topk, raw=raw, replace=replace, hierarchy=hierarchy)
output_file = output_path / f"{json_file.stem}.json"
with open(output_file, "w") as f:
json.dump(result, f, indent=2)
def main():
parser = argparse.ArgumentParser(description="Process trace files (JSONL for Langfuse, JSON for LangSmith)")
parser.add_argument("--traces", required=True, help="Path to traces folder containing JSONL/JSON files")
parser.add_argument("--output", default="traces", help="Output folder for filtered traces (default: traces)")
parser.add_argument("--max_char", type=int, default=500, help="Maximum character length for each string (default: 500)")
parser.add_argument("--topk", type=int, default=10, help="Topk for compression (default: 10)")
parser.add_argument("--raw", action="store_true", help="Keep raw data (default: False)")
parser.add_argument("--replace", action="store_true", help="Replace data with compressed data (default: False)")
parser.add_argument("--hierarchy", action="store_true", help="Use hierarchy for compression (default: False)")
parser.add_argument("--min_char", type=int, default=25, help="Minimum character length for each string (default: 50)")
args = parser.parse_args()
process_traces_folder(args.traces, args.output, args.max_char, args.topk, raw=args.raw, replace=args.replace, hierarchy=args.hierarchy)
print("Processing complete!")
return 0
if __name__ == "__main__":
exit(main())