Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| import argparse | |
| import json | |
| import re | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Tuple | |
| from agentgraph.shared.models.platform_models.langsmith import ( | |
| LangFuseObservation, | |
| LangFuseSession, | |
| LangFuseTrace, | |
| LangSmithRun, | |
| LangSmithTrace, | |
| ) | |
| def is_empty_value(value: Any) -> bool: | |
| """Check if value is empty (null, empty array, empty string).""" | |
| return ( | |
| value is None or | |
| value == [] or | |
| value == {} or | |
| value == "" or | |
| (isinstance(value, str) and value.strip() == "") | |
| ) | |
| def try_parse_json(text: str) -> Any: | |
| if text.strip().startswith(("{", "[")): | |
| try: | |
| return json.loads(text) | |
| except json.JSONDecodeError: | |
| return None | |
| return None | |
| def filter_empty_values(obj: Any) -> Any: | |
| if isinstance(obj, dict): | |
| filtered = {} | |
| for k, v in obj.items(): | |
| if v is not None and v != [] and v != "": | |
| if isinstance(v, str) and v.strip() == "": | |
| continue | |
| filtered_value = filter_empty_values(v) | |
| if filtered_value is not None and filtered_value != [] and filtered_value != "": | |
| filtered[k] = filtered_value | |
| return filtered | |
| elif isinstance(obj, list): | |
| return [filter_empty_values(item) for item in obj if item is not None and item != [] and item != ""] | |
| else: | |
| return obj | |
| def collect_all_strings(obj: Any, strings = None) -> List[str]: | |
| if strings is None: | |
| strings = [] | |
| if isinstance(obj, str): | |
| trimmed = obj.strip() | |
| if len(trimmed) > 50 and 'http' not in trimmed: | |
| strings.append(trimmed) | |
| elif isinstance(obj, dict): | |
| for value in obj.values(): | |
| collect_all_strings(value, strings) | |
| elif isinstance(obj, list): | |
| for item in obj: | |
| collect_all_strings(item, strings) | |
| return strings | |
| def count_strings(obj: Any, ignore_keys: Optional[List[str]] = None) -> int: | |
| if not ignore_keys: | |
| ignore_keys = [] | |
| if isinstance(obj, str): | |
| return 1 | |
| elif isinstance(obj, dict): | |
| return sum(count_strings(v, ignore_keys) for k, v in obj.items() if k not in ignore_keys) | |
| elif isinstance(obj, list): | |
| return sum(count_strings(item, ignore_keys) for item in obj) | |
| else: | |
| return 0 | |
| def find_repeated_patterns(obj: Any, topk: int = 10) -> List[Tuple[str, int, int]]: | |
| all_strings = collect_all_strings(obj) | |
| if not all_strings: | |
| print("No strings found") | |
| return [] | |
| patterns = {} | |
| for text in all_strings: | |
| patterns[text] = patterns.get(text, 0) + 1 | |
| repeated = [] | |
| for pattern, count in patterns.items(): | |
| if count > 1: | |
| var_name_length = 18 | |
| saved_bytes = (len(pattern) - var_name_length) * count | |
| if saved_bytes > 0: | |
| repeated.append((pattern, count, saved_bytes)) | |
| if not repeated: | |
| print("No repeated patterns found") | |
| else: | |
| print(f"Found {len(repeated)} repeated patterns") | |
| repeated.sort(key=lambda x: x[2], reverse=True) | |
| return repeated[:topk] | |
| def replace_string_in_obj(obj: Any, pattern: str, replacement: str) -> Any: | |
| if isinstance(obj, str): | |
| if obj.strip() == pattern: | |
| return replacement | |
| return obj | |
| elif isinstance(obj, dict): | |
| return {k: replace_string_in_obj(v, pattern, replacement) for k, v in obj.items()} | |
| elif isinstance(obj, list): | |
| return [replace_string_in_obj(item, pattern, replacement) for item in obj] | |
| else: | |
| return obj | |
| def compress_repeated_strings(obj: Any, topk: int) -> Any: | |
| try: | |
| repeated_patterns = find_repeated_patterns(obj, topk=topk) | |
| if not repeated_patterns: | |
| print("No patterns to compress") | |
| return obj | |
| global_variables = {} | |
| compressed_obj = {} | |
| for i, (pattern, _, _) in enumerate(repeated_patterns): | |
| var_name = f"REPEATED_STRING{i+1}" | |
| global_variables[var_name] = pattern | |
| replacement = f"${{{var_name}}}" | |
| obj = replace_string_in_obj(obj, pattern, replacement) | |
| if not global_variables: | |
| print("No global_variables created") | |
| return obj | |
| compressed_obj["global_variables"] = global_variables | |
| for key in obj.keys(): | |
| compressed_obj[key] = obj[key] | |
| print(f"Created {len(global_variables)} global variables") | |
| return compressed_obj | |
| except Exception as e: | |
| print(f"Compression failed: {e}") | |
| return obj | |
| def process_filtered_data(filtered_data: Any, topk: int, maximum: int = 512000) -> Any: | |
| compressed_data = compress_repeated_strings(filtered_data, topk=topk) | |
| return truncate_text(compressed_data, maximum) # maximum 128k tokens * 4 = 512k characters | |
| def truncate_object(obj: Any, max_char: int, ignore_keys: Optional[List[str]] = None) -> Any: | |
| obj = filter_empty_values(obj) | |
| if not ignore_keys: | |
| ignore_keys = [] | |
| if isinstance(obj, str): | |
| trimmed = obj.strip() | |
| cleaned = re.sub(r"\n+", "\n", trimmed) | |
| cleaned = re.sub(r"\t+", "\t", cleaned) | |
| if len(cleaned) > max_char: | |
| half_length = (max_char - 3) // 2 | |
| first_half = cleaned[:half_length] | |
| second_half = cleaned[-half_length:] | |
| return first_half + "..." + second_half | |
| return cleaned | |
| elif isinstance(obj, dict): | |
| return {k: truncate_object(v, max_char, ignore_keys) for k, v in obj.items() if k not in ignore_keys} | |
| elif isinstance(obj, list): | |
| return [truncate_object(item, max_char, ignore_keys) for item in obj] | |
| else: | |
| return obj | |
| def truncate_text(text: Any, max_length: int, min_char: int = 25, ignore_keys: Optional[List[str]] = None) -> Any: | |
| if isinstance(text, str): | |
| parsed_json = try_parse_json(text) | |
| if parsed_json is not None: | |
| return truncate_text(parsed_json, max_length) | |
| trimmed = text.strip() | |
| cleaned = re.sub(r"\n+", "\n", trimmed) | |
| cleaned = re.sub(r"\t+", "\t", cleaned) | |
| if len(cleaned) > max_length: | |
| half_length = (max_length - 3) // 2 | |
| first_half = cleaned[:half_length] | |
| second_half = cleaned[-half_length:] | |
| return first_half + "..." + second_half | |
| return cleaned | |
| elif isinstance(text, (dict, list)): | |
| if isinstance(text, dict) and "global_variables" in text: | |
| result = {} | |
| for k, v in text.items(): | |
| if k == "global_variables": | |
| result[k] = truncate_text(v, 5000, min_char) | |
| else: | |
| result[k] = truncate_text(v, max_length, min_char) | |
| return result | |
| filtered_text = filter_empty_values(text) | |
| string_count = count_strings(filtered_text, ignore_keys) | |
| if string_count == 0: | |
| return filtered_text | |
| max_char = max_length // string_count | |
| if max_char < min_char: | |
| max_char = min_char | |
| return truncate_object(filtered_text, max_char, ignore_keys=ignore_keys) | |
| else: | |
| return text | |
| def filter_langfuse_observation(observation: LangFuseObservation, max_char: Optional[int]) -> Dict[str, Any]: | |
| """Filter Langfuse observation object to keep only required fields.""" | |
| filtered = {} | |
| if observation.name: | |
| filtered["name"] = observation.name | |
| if observation.type: | |
| filtered["type"] = observation.type | |
| if observation.input: | |
| filtered["input"] = truncate_text(observation.input, max_char) if max_char is not None else observation.input | |
| if observation.output: | |
| filtered["output"] = truncate_text(observation.output, max_char) if max_char is not None else observation.output | |
| if observation.statusMessage: | |
| filtered["statusMessage"] = observation.statusMessage | |
| if observation.level: | |
| filtered["level"] = observation.level | |
| # if observation.model: | |
| # filtered["model"] = observation.model | |
| # if observation.costDetails: | |
| # filtered["costDetails"] = observation.costDetails | |
| # if observation.usage: | |
| # filtered["usage"] = observation.usage | |
| return filtered | |
| def hierarchy_langfuse_observation(root_obs: LangFuseObservation, observations: List[LangFuseObservation], max_char: Optional[int]) -> Dict[str, Any]: | |
| """Convert Langfuse observation to hierarchy dict structure.""" | |
| obs_dict = { obs.id: obs for obs in observations } | |
| direct_child_obs_ids = [obs.id for obs in observations if obs.parentObservationId == root_obs.id] | |
| filtered = filter_langfuse_observation(root_obs, max_char) | |
| if direct_child_obs_ids: | |
| filtered["child_observations"] = [hierarchy_langfuse_observation(obs_dict[obs_id], observations, max_char) for obs_id in direct_child_obs_ids] | |
| return filtered | |
| def set_hierarchy_depths(node: Dict[str, Any], child_key: str, depth: int = 0) -> int: | |
| """Set depth for hierarchy structure and return max depth.""" | |
| node["depth"] = depth | |
| max_depth = depth | |
| if child_key in node: | |
| for child in node[child_key]: | |
| child_max = set_hierarchy_depths(child, child_key, depth + 1) | |
| max_depth = max(max_depth, child_max) | |
| return max_depth | |
| def filter_langfuse_trace(trace: LangFuseTrace, max_char: Optional[int], hierarchy: bool = False) -> Dict[str, Any]: | |
| """Filter Langfuse trace object to keep only required fields.""" | |
| filtered = {} | |
| if trace.name: | |
| filtered["name"] = trace.name | |
| if trace.observations and not hierarchy: | |
| filtered["observations"] = [filter_langfuse_observation(obs, max_char) for obs in trace.observations] | |
| elif trace.observations and hierarchy: | |
| root_observations = [obs for obs in trace.observations if obs.parentObservationId is None] | |
| filtered["observations"] = [hierarchy_langfuse_observation(root_obs, trace.observations, max_char) for root_obs in root_observations] | |
| max_depth = 0 | |
| for obs in filtered["observations"]: | |
| obs_max = set_hierarchy_depths(obs, "child_observations") | |
| max_depth = max(max_depth, obs_max) | |
| filtered["observation_depth"] = max_depth | |
| if trace.totalCost: | |
| filtered["total_cost"] = trace.totalCost | |
| if trace.input: | |
| filtered["input"] = truncate_text(trace.input, max_char) if max_char is not None else trace.input | |
| if trace.output: | |
| filtered["output"] = truncate_text(trace.output, max_char) if max_char is not None else trace.output | |
| return filtered | |
| def filter_langsmith_run(run: LangSmithRun, max_char: Optional[int]) -> Dict[str, Any]: | |
| """Filter LangSmith trace object to keep only required fields.""" | |
| filtered = {} | |
| # if run.name: | |
| # del run.name | |
| ignore_keys = ["additional_kwargs", | |
| "response_metadata", | |
| "iterations", | |
| "usage_metadata", | |
| "id", | |
| "kwargs", | |
| "example", | |
| "llm_output", | |
| "lc", | |
| "metadata", | |
| "generation_info", | |
| "args", | |
| "output", | |
| "outputs", | |
| ] | |
| if run.inputs: | |
| filtered["inputs"] = truncate_text(run.inputs, max_char, ignore_keys=ignore_keys) if max_char is not None else run.inputs | |
| if filtered["inputs"] == {}: | |
| del filtered["inputs"] | |
| # if run.outputs: | |
| # filtered["outputs"] = truncate_text(run.outputs, max_char, ignore_keys=ignore_keys) if max_char is not None else run.outputs | |
| # if filtered["outputs"] == {}: | |
| # del filtered["outputs"] | |
| if run.error: | |
| filtered["error"] = truncate_text(run.error, max_char) if max_char is not None else run.error | |
| return filtered | |
| def hierarchy_langsmith_run(root_run: LangSmithRun, runs: List[LangSmithRun], max_char: Optional[int]) -> Dict[str, Any]: | |
| """Convert LangSmith run to hierarchy dict structure.""" | |
| run_dict = { run.id: run for run in runs } | |
| direct_child_run_ids = [run.id for run in runs if run.parent_run_id == root_run.id] | |
| filtered = filter_langsmith_run(root_run, max_char) | |
| if direct_child_run_ids: | |
| filtered["child_runs"] = [hierarchy_langsmith_run(run_dict[run_id], runs, max_char) for run_id in direct_child_run_ids] | |
| return filtered | |
| def filter_langfuse_session(session: LangFuseSession, | |
| max_char: Optional[int], | |
| topk: int, | |
| raw: bool = False, | |
| replace: bool = False, | |
| hierarchy: bool = False, | |
| ) -> Dict[str, Any]: | |
| """Filter Langfuse trace object to keep only required fields.""" | |
| filtered = {} | |
| if raw: | |
| return session.model_dump() | |
| else: | |
| if session.session_name: | |
| filtered["name"] = session.session_name | |
| if session.traces: | |
| filtered["traces"] = [filter_langfuse_trace(trace, max_char, hierarchy) for trace in session.traces] | |
| if replace: | |
| return process_filtered_data(filtered, topk) | |
| return filtered | |
| def filter_langsmith_trace(trace: LangSmithTrace, | |
| max_char: Optional[int], | |
| topk: int, | |
| raw: bool = False, | |
| replace: bool = False, | |
| hierarchy: bool = False, | |
| ) -> Dict[str, Any]: | |
| """Filter LangSmith export data to keep only required fields.""" | |
| filtered = {} | |
| if raw: | |
| return trace.model_dump() | |
| else: | |
| if trace.trace_name: | |
| filtered["name"] = trace.trace_name | |
| if trace.runs and not hierarchy: | |
| filtered["runs"] = [filter_langsmith_run(run, max_char) for run in trace.runs] | |
| elif trace.runs and hierarchy: | |
| filtered["runs"] = [hierarchy_langsmith_run(trace.runs[0], trace.runs, max_char)] | |
| if filtered["runs"]: | |
| max_depth = set_hierarchy_depths(filtered["runs"][0], "child_runs") | |
| filtered["run_depth"] = max_depth | |
| if replace: | |
| return process_filtered_data(filtered, topk) | |
| return filtered | |
| def detect_json_format(json_file: Path) -> str: | |
| """Detect if JSON file is Langfuse (session_id) or LangSmith (trace_id) format.""" | |
| try: | |
| with open(json_file, 'r') as f: | |
| data = json.load(f) | |
| if 'session_id' in data: | |
| return 'langfuse' | |
| elif 'trace_id' in data: | |
| return 'langsmith' | |
| else: | |
| print(f"Warning: Cannot detect format for {json_file.name}, defaulting to LangSmith") | |
| return 'langsmith' | |
| except Exception as e: | |
| print(f"Error reading {json_file.name}: {e}") | |
| return 'langsmith' | |
| def process_traces_folder(traces_folder: str, | |
| output_folder: str, | |
| max_char: Optional[int], | |
| topk: int, | |
| raw: bool = False, | |
| replace: bool = False, | |
| hierarchy: bool = False, | |
| ): | |
| """Process all trace files in the folder (both JSONL and JSON).""" | |
| traces_path = Path(traces_folder) | |
| if not traces_path.exists(): | |
| raise FileNotFoundError(f"Traces folder not found: {traces_folder}") | |
| jsonl_files = list(traces_path.glob("*.jsonl")) | |
| json_files = list(traces_path.glob("*.json")) | |
| if not jsonl_files and not json_files: | |
| print(f"No JSONL or JSON files found in {traces_folder}") | |
| return | |
| langfuse_files = list(jsonl_files) | |
| langsmith_files = [] | |
| for json_file in json_files: | |
| format_type = detect_json_format(json_file) | |
| if format_type == 'langfuse': | |
| langfuse_files.append(json_file) | |
| else: | |
| langsmith_files.append(json_file) | |
| output_path = Path(output_folder) | |
| output_path.mkdir(parents=True, exist_ok=True) | |
| for files, is_langfuse in [(langfuse_files, True), (langsmith_files, False)]: | |
| if files: | |
| print(f"Found {len(files)} {'Langfuse' if is_langfuse else 'LangSmith'} files") | |
| for json_file in files: | |
| with open(json_file, "r") as f: | |
| export_data = json.load(f) | |
| if is_langfuse: | |
| result = [filter_langfuse_session(LangFuseSession(**export_data), max_char, topk, raw=raw, replace=replace, hierarchy=hierarchy)] | |
| else: | |
| result = filter_langsmith_trace(LangSmithTrace(**export_data), max_char, topk, raw=raw, replace=replace, hierarchy=hierarchy) | |
| output_file = output_path / f"{json_file.stem}.json" | |
| with open(output_file, "w") as f: | |
| json.dump(result, f, indent=2) | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Process trace files (JSONL for Langfuse, JSON for LangSmith)") | |
| parser.add_argument("--traces", required=True, help="Path to traces folder containing JSONL/JSON files") | |
| parser.add_argument("--output", default="traces", help="Output folder for filtered traces (default: traces)") | |
| parser.add_argument("--max_char", type=int, default=500, help="Maximum character length for each string (default: 500)") | |
| parser.add_argument("--topk", type=int, default=10, help="Topk for compression (default: 10)") | |
| parser.add_argument("--raw", action="store_true", help="Keep raw data (default: False)") | |
| parser.add_argument("--replace", action="store_true", help="Replace data with compressed data (default: False)") | |
| parser.add_argument("--hierarchy", action="store_true", help="Use hierarchy for compression (default: False)") | |
| parser.add_argument("--min_char", type=int, default=25, help="Minimum character length for each string (default: 50)") | |
| args = parser.parse_args() | |
| process_traces_folder(args.traces, args.output, args.max_char, args.topk, raw=args.raw, replace=args.replace, hierarchy=args.hierarchy) | |
| print("Processing complete!") | |
| return 0 | |
| if __name__ == "__main__": | |
| exit(main()) | |