#!/usr/bin/env python3 import argparse import json import re from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from agentgraph.shared.models.platform_models.langsmith import ( LangFuseObservation, LangFuseSession, LangFuseTrace, LangSmithRun, LangSmithTrace, ) def is_empty_value(value: Any) -> bool: """Check if value is empty (null, empty array, empty string).""" return ( value is None or value == [] or value == {} or value == "" or (isinstance(value, str) and value.strip() == "") ) def try_parse_json(text: str) -> Any: if text.strip().startswith(("{", "[")): try: return json.loads(text) except json.JSONDecodeError: return None return None def filter_empty_values(obj: Any) -> Any: if isinstance(obj, dict): filtered = {} for k, v in obj.items(): if v is not None and v != [] and v != "": if isinstance(v, str) and v.strip() == "": continue filtered_value = filter_empty_values(v) if filtered_value is not None and filtered_value != [] and filtered_value != "": filtered[k] = filtered_value return filtered elif isinstance(obj, list): return [filter_empty_values(item) for item in obj if item is not None and item != [] and item != ""] else: return obj def collect_all_strings(obj: Any, strings = None) -> List[str]: if strings is None: strings = [] if isinstance(obj, str): trimmed = obj.strip() if len(trimmed) > 50 and 'http' not in trimmed: strings.append(trimmed) elif isinstance(obj, dict): for value in obj.values(): collect_all_strings(value, strings) elif isinstance(obj, list): for item in obj: collect_all_strings(item, strings) return strings def count_strings(obj: Any, ignore_keys: Optional[List[str]] = None) -> int: if not ignore_keys: ignore_keys = [] if isinstance(obj, str): return 1 elif isinstance(obj, dict): return sum(count_strings(v, ignore_keys) for k, v in obj.items() if k not in ignore_keys) elif isinstance(obj, list): return sum(count_strings(item, ignore_keys) for item in obj) else: return 0 def find_repeated_patterns(obj: Any, topk: int = 10) -> List[Tuple[str, int, int]]: all_strings = collect_all_strings(obj) if not all_strings: print("No strings found") return [] patterns = {} for text in all_strings: patterns[text] = patterns.get(text, 0) + 1 repeated = [] for pattern, count in patterns.items(): if count > 1: var_name_length = 18 saved_bytes = (len(pattern) - var_name_length) * count if saved_bytes > 0: repeated.append((pattern, count, saved_bytes)) if not repeated: print("No repeated patterns found") else: print(f"Found {len(repeated)} repeated patterns") repeated.sort(key=lambda x: x[2], reverse=True) return repeated[:topk] def replace_string_in_obj(obj: Any, pattern: str, replacement: str) -> Any: if isinstance(obj, str): if obj.strip() == pattern: return replacement return obj elif isinstance(obj, dict): return {k: replace_string_in_obj(v, pattern, replacement) for k, v in obj.items()} elif isinstance(obj, list): return [replace_string_in_obj(item, pattern, replacement) for item in obj] else: return obj def compress_repeated_strings(obj: Any, topk: int) -> Any: try: repeated_patterns = find_repeated_patterns(obj, topk=topk) if not repeated_patterns: print("No patterns to compress") return obj global_variables = {} compressed_obj = {} for i, (pattern, _, _) in enumerate(repeated_patterns): var_name = f"REPEATED_STRING{i+1}" global_variables[var_name] = pattern replacement = f"${{{var_name}}}" obj = replace_string_in_obj(obj, pattern, replacement) if not global_variables: print("No global_variables created") return obj compressed_obj["global_variables"] = global_variables for key in obj.keys(): compressed_obj[key] = obj[key] print(f"Created {len(global_variables)} global variables") return compressed_obj except Exception as e: print(f"Compression failed: {e}") return obj def process_filtered_data(filtered_data: Any, topk: int, maximum: int = 512000) -> Any: compressed_data = compress_repeated_strings(filtered_data, topk=topk) return truncate_text(compressed_data, maximum) # maximum 128k tokens * 4 = 512k characters def truncate_object(obj: Any, max_char: int, ignore_keys: Optional[List[str]] = None) -> Any: obj = filter_empty_values(obj) if not ignore_keys: ignore_keys = [] if isinstance(obj, str): trimmed = obj.strip() cleaned = re.sub(r"\n+", "\n", trimmed) cleaned = re.sub(r"\t+", "\t", cleaned) if len(cleaned) > max_char: half_length = (max_char - 3) // 2 first_half = cleaned[:half_length] second_half = cleaned[-half_length:] return first_half + "..." + second_half return cleaned elif isinstance(obj, dict): return {k: truncate_object(v, max_char, ignore_keys) for k, v in obj.items() if k not in ignore_keys} elif isinstance(obj, list): return [truncate_object(item, max_char, ignore_keys) for item in obj] else: return obj def truncate_text(text: Any, max_length: int, min_char: int = 25, ignore_keys: Optional[List[str]] = None) -> Any: if isinstance(text, str): parsed_json = try_parse_json(text) if parsed_json is not None: return truncate_text(parsed_json, max_length) trimmed = text.strip() cleaned = re.sub(r"\n+", "\n", trimmed) cleaned = re.sub(r"\t+", "\t", cleaned) if len(cleaned) > max_length: half_length = (max_length - 3) // 2 first_half = cleaned[:half_length] second_half = cleaned[-half_length:] return first_half + "..." + second_half return cleaned elif isinstance(text, (dict, list)): if isinstance(text, dict) and "global_variables" in text: result = {} for k, v in text.items(): if k == "global_variables": result[k] = truncate_text(v, 5000, min_char) else: result[k] = truncate_text(v, max_length, min_char) return result filtered_text = filter_empty_values(text) string_count = count_strings(filtered_text, ignore_keys) if string_count == 0: return filtered_text max_char = max_length // string_count if max_char < min_char: max_char = min_char return truncate_object(filtered_text, max_char, ignore_keys=ignore_keys) else: return text def filter_langfuse_observation(observation: LangFuseObservation, max_char: Optional[int]) -> Dict[str, Any]: """Filter Langfuse observation object to keep only required fields.""" filtered = {} if observation.name: filtered["name"] = observation.name if observation.type: filtered["type"] = observation.type if observation.input: filtered["input"] = truncate_text(observation.input, max_char) if max_char is not None else observation.input if observation.output: filtered["output"] = truncate_text(observation.output, max_char) if max_char is not None else observation.output if observation.statusMessage: filtered["statusMessage"] = observation.statusMessage if observation.level: filtered["level"] = observation.level # if observation.model: # filtered["model"] = observation.model # if observation.costDetails: # filtered["costDetails"] = observation.costDetails # if observation.usage: # filtered["usage"] = observation.usage return filtered def hierarchy_langfuse_observation(root_obs: LangFuseObservation, observations: List[LangFuseObservation], max_char: Optional[int]) -> Dict[str, Any]: """Convert Langfuse observation to hierarchy dict structure.""" obs_dict = { obs.id: obs for obs in observations } direct_child_obs_ids = [obs.id for obs in observations if obs.parentObservationId == root_obs.id] filtered = filter_langfuse_observation(root_obs, max_char) if direct_child_obs_ids: filtered["child_observations"] = [hierarchy_langfuse_observation(obs_dict[obs_id], observations, max_char) for obs_id in direct_child_obs_ids] return filtered def set_hierarchy_depths(node: Dict[str, Any], child_key: str, depth: int = 0) -> int: """Set depth for hierarchy structure and return max depth.""" node["depth"] = depth max_depth = depth if child_key in node: for child in node[child_key]: child_max = set_hierarchy_depths(child, child_key, depth + 1) max_depth = max(max_depth, child_max) return max_depth def filter_langfuse_trace(trace: LangFuseTrace, max_char: Optional[int], hierarchy: bool = False) -> Dict[str, Any]: """Filter Langfuse trace object to keep only required fields.""" filtered = {} if trace.name: filtered["name"] = trace.name if trace.observations and not hierarchy: filtered["observations"] = [filter_langfuse_observation(obs, max_char) for obs in trace.observations] elif trace.observations and hierarchy: root_observations = [obs for obs in trace.observations if obs.parentObservationId is None] filtered["observations"] = [hierarchy_langfuse_observation(root_obs, trace.observations, max_char) for root_obs in root_observations] max_depth = 0 for obs in filtered["observations"]: obs_max = set_hierarchy_depths(obs, "child_observations") max_depth = max(max_depth, obs_max) filtered["observation_depth"] = max_depth if trace.totalCost: filtered["total_cost"] = trace.totalCost if trace.input: filtered["input"] = truncate_text(trace.input, max_char) if max_char is not None else trace.input if trace.output: filtered["output"] = truncate_text(trace.output, max_char) if max_char is not None else trace.output return filtered def filter_langsmith_run(run: LangSmithRun, max_char: Optional[int]) -> Dict[str, Any]: """Filter LangSmith trace object to keep only required fields.""" filtered = {} # if run.name: # del run.name ignore_keys = ["additional_kwargs", "response_metadata", "iterations", "usage_metadata", "id", "kwargs", "example", "llm_output", "lc", "metadata", "generation_info", "args", "output", "outputs", ] if run.inputs: filtered["inputs"] = truncate_text(run.inputs, max_char, ignore_keys=ignore_keys) if max_char is not None else run.inputs if filtered["inputs"] == {}: del filtered["inputs"] # if run.outputs: # filtered["outputs"] = truncate_text(run.outputs, max_char, ignore_keys=ignore_keys) if max_char is not None else run.outputs # if filtered["outputs"] == {}: # del filtered["outputs"] if run.error: filtered["error"] = truncate_text(run.error, max_char) if max_char is not None else run.error return filtered def hierarchy_langsmith_run(root_run: LangSmithRun, runs: List[LangSmithRun], max_char: Optional[int]) -> Dict[str, Any]: """Convert LangSmith run to hierarchy dict structure.""" run_dict = { run.id: run for run in runs } direct_child_run_ids = [run.id for run in runs if run.parent_run_id == root_run.id] filtered = filter_langsmith_run(root_run, max_char) if direct_child_run_ids: filtered["child_runs"] = [hierarchy_langsmith_run(run_dict[run_id], runs, max_char) for run_id in direct_child_run_ids] return filtered def filter_langfuse_session(session: LangFuseSession, max_char: Optional[int], topk: int, raw: bool = False, replace: bool = False, hierarchy: bool = False, ) -> Dict[str, Any]: """Filter Langfuse trace object to keep only required fields.""" filtered = {} if raw: return session.model_dump() else: if session.session_name: filtered["name"] = session.session_name if session.traces: filtered["traces"] = [filter_langfuse_trace(trace, max_char, hierarchy) for trace in session.traces] if replace: return process_filtered_data(filtered, topk) return filtered def filter_langsmith_trace(trace: LangSmithTrace, max_char: Optional[int], topk: int, raw: bool = False, replace: bool = False, hierarchy: bool = False, ) -> Dict[str, Any]: """Filter LangSmith export data to keep only required fields.""" filtered = {} if raw: return trace.model_dump() else: if trace.trace_name: filtered["name"] = trace.trace_name if trace.runs and not hierarchy: filtered["runs"] = [filter_langsmith_run(run, max_char) for run in trace.runs] elif trace.runs and hierarchy: filtered["runs"] = [hierarchy_langsmith_run(trace.runs[0], trace.runs, max_char)] if filtered["runs"]: max_depth = set_hierarchy_depths(filtered["runs"][0], "child_runs") filtered["run_depth"] = max_depth if replace: return process_filtered_data(filtered, topk) return filtered def detect_json_format(json_file: Path) -> str: """Detect if JSON file is Langfuse (session_id) or LangSmith (trace_id) format.""" try: with open(json_file, 'r') as f: data = json.load(f) if 'session_id' in data: return 'langfuse' elif 'trace_id' in data: return 'langsmith' else: print(f"Warning: Cannot detect format for {json_file.name}, defaulting to LangSmith") return 'langsmith' except Exception as e: print(f"Error reading {json_file.name}: {e}") return 'langsmith' def process_traces_folder(traces_folder: str, output_folder: str, max_char: Optional[int], topk: int, raw: bool = False, replace: bool = False, hierarchy: bool = False, ): """Process all trace files in the folder (both JSONL and JSON).""" traces_path = Path(traces_folder) if not traces_path.exists(): raise FileNotFoundError(f"Traces folder not found: {traces_folder}") jsonl_files = list(traces_path.glob("*.jsonl")) json_files = list(traces_path.glob("*.json")) if not jsonl_files and not json_files: print(f"No JSONL or JSON files found in {traces_folder}") return langfuse_files = list(jsonl_files) langsmith_files = [] for json_file in json_files: format_type = detect_json_format(json_file) if format_type == 'langfuse': langfuse_files.append(json_file) else: langsmith_files.append(json_file) output_path = Path(output_folder) output_path.mkdir(parents=True, exist_ok=True) for files, is_langfuse in [(langfuse_files, True), (langsmith_files, False)]: if files: print(f"Found {len(files)} {'Langfuse' if is_langfuse else 'LangSmith'} files") for json_file in files: with open(json_file, "r") as f: export_data = json.load(f) if is_langfuse: result = [filter_langfuse_session(LangFuseSession(**export_data), max_char, topk, raw=raw, replace=replace, hierarchy=hierarchy)] else: result = filter_langsmith_trace(LangSmithTrace(**export_data), max_char, topk, raw=raw, replace=replace, hierarchy=hierarchy) output_file = output_path / f"{json_file.stem}.json" with open(output_file, "w") as f: json.dump(result, f, indent=2) def main(): parser = argparse.ArgumentParser(description="Process trace files (JSONL for Langfuse, JSON for LangSmith)") parser.add_argument("--traces", required=True, help="Path to traces folder containing JSONL/JSON files") parser.add_argument("--output", default="traces", help="Output folder for filtered traces (default: traces)") parser.add_argument("--max_char", type=int, default=500, help="Maximum character length for each string (default: 500)") parser.add_argument("--topk", type=int, default=10, help="Topk for compression (default: 10)") parser.add_argument("--raw", action="store_true", help="Keep raw data (default: False)") parser.add_argument("--replace", action="store_true", help="Replace data with compressed data (default: False)") parser.add_argument("--hierarchy", action="store_true", help="Use hierarchy for compression (default: False)") parser.add_argument("--min_char", type=int, default=25, help="Minimum character length for each string (default: 50)") args = parser.parse_args() process_traces_folder(args.traces, args.output, args.max_char, args.topk, raw=args.raw, replace=args.replace, hierarchy=args.hierarchy) print("Processing complete!") return 0 if __name__ == "__main__": exit(main())