import logging from typing import List, Dict, Any, Optional, Tuple from agentgraph.shared.models.reference_based.entity import Entity from agentgraph.shared.models.reference_based.relation import Relation from agentgraph.shared.models.reference_based.content_reference import ContentReference from agentgraph.input.text_processing.trace_line_processor import TraceLineNumberProcessor logger = logging.getLogger(__name__) # Sentinel delimiter used to concatenate multiple prompt snippets when more than one # reference is resolved. We choose the Unicode "SYMBOL FOR UNIT SEPARATOR" (U+241F) # which will never legitimately appear inside user-supplied prompt text, eliminating # delimiter-collision issues seen with the previous "|||" sequence. MULTI_SNIPPET_DELIMITER = "\u241F" class ContentReferenceResolver: """ Service for resolving ContentReference objects to actual content from original traces. This enables efficient content retrieval while maintaining position-based references. """ def __init__(self): self.line_processor = TraceLineNumberProcessor() def resolve_entity_prompts(self, entities: List[Entity], original_trace: str, window_metadata: Dict[str, Any]) -> List[Entity]: """ Resolve ContentReference objects in entities to actual prompt content. Args: entities: List of Entity objects that may contain ContentReference objects original_trace: Original trace content (without line numbers) window_metadata: Metadata about the window including character positions Returns: List of Entity objects with resolved prompt content """ if not entities or not original_trace: return entities # CRITICAL FIX: Use the same character-to-line mapping approach as extraction # This ensures ContentReferences point to the correct lines numbered_content = self._create_extraction_compatible_numbering(original_trace) resolved_entities = [] resolution_stats = { "total_entities": len(entities), "entities_with_refs": 0, "successful_resolutions": 0, "failed_resolutions": 0 } for entity in entities: resolved_entity = entity.model_copy() # Create a copy to avoid modifying original # Check if entity has a content reference if entity.raw_prompt_ref: resolution_stats["entities_with_refs"] += 1 # Resolve the content reference snippets, is_valid = self.line_processor.extract_content_by_reference( numbered_content, entity.raw_prompt_ref ) # Add detailed debug logging to track resolution process logger.debug(f"Entity {entity.id} resolution debug:") logger.debug(f" - raw_prompt_ref count: {len(entity.raw_prompt_ref)}") for idx, ref in enumerate(entity.raw_prompt_ref): logger.debug(f" - ref[{idx}]: L{ref.line_start}-L{ref.line_end}") logger.debug(f" - extracted snippets count: {len(snippets) if snippets else 0}") if snippets: for idx, snippet in enumerate(snippets): preview = snippet[:50].replace('\n', '\\n') if snippet else "EMPTY" logger.debug(f" - snippet[{idx}]: {preview}...") if snippets: # Scrub any accidental occurrences of the delimiter inside the snippet safe_snippets = [ s.replace(MULTI_SNIPPET_DELIMITER, " ") for s in snippets ] # Concatenate snippets into a single string when multiple references exist joined_prompt = ( safe_snippets[0] if len(safe_snippets) == 1 else MULTI_SNIPPET_DELIMITER.join(safe_snippets) ) resolved_entity.raw_prompt = joined_prompt resolution_stats["successful_resolutions"] += 1 # Debug logging to check if line numbers are being removed logger.debug(f"Resolved prompt for entity {entity.id}: {len(joined_prompt)} characters") if '' in joined_prompt: logger.warning(f"Line numbers still present in resolved entity {entity.id}: {joined_prompt[:100]}...") else: logger.debug(f"Entity {entity.id} prompt is clean (no line numbers detected)") if len(safe_snippets) > 1: logger.debug(f" - joined with delimiter, split count will be: {len(safe_snippets)}") else: # Keep original prompt if resolution failed resolution_stats["failed_resolutions"] += 1 logger.warning(f"Failed to resolve prompt reference for entity {entity.id}") resolved_entities.append(resolved_entity) logger.info(f"Entity prompt resolution stats: {resolution_stats}") return resolved_entities def resolve_relation_prompts(self, relations: List[Relation], original_trace: str, window_metadata: Dict[str, Any]) -> List[Relation]: """ Resolve ContentReference objects in relations to actual interaction prompt content. Args: relations: List of Relation objects that may contain ContentReference objects original_trace: Original trace content (without line numbers) window_metadata: Metadata about the window including character positions Returns: List of Relation objects with resolved interaction prompt content """ if not relations or not original_trace: return relations numbered_content = self._create_extraction_compatible_numbering(original_trace) resolved_relations = [] resolution_stats = { "total_relations": len(relations), "relations_with_refs": 0, "successful_resolutions": 0, "failed_resolutions": 0 } for relation in relations: resolved_relation = relation.model_copy() # Create a copy to avoid modifying original # Check if relation has a content reference if relation.interaction_prompt_ref: resolution_stats["relations_with_refs"] += 1 # Resolve the content reference snippets, is_valid = self.line_processor.extract_content_by_reference( numbered_content, relation.interaction_prompt_ref ) if snippets: # Scrub any accidental occurrences of the delimiter inside the snippet safe_snippets = [ s.replace(MULTI_SNIPPET_DELIMITER, " ") for s in snippets ] # Concatenate snippets into a single string when multiple references exist joined_prompt = ( safe_snippets[0] if len(safe_snippets) == 1 else MULTI_SNIPPET_DELIMITER.join(safe_snippets) ) resolved_relation.interaction_prompt = joined_prompt resolution_stats["successful_resolutions"] += 1 # Debug logging to check if line numbers are being removed logger.debug(f"Resolved interaction prompt for relation {relation.id}: {len(joined_prompt)} characters") if '' in joined_prompt: logger.warning(f"Line numbers still present in resolved relation {relation.id}: {joined_prompt[:100]}...") else: logger.debug(f"Relation {relation.id} prompt is clean (no line numbers detected)") else: # Keep original prompt if resolution failed resolution_stats["failed_resolutions"] += 1 logger.warning(f"Failed to resolve interaction prompt reference for relation {relation.id}") resolved_relations.append(resolved_relation) logger.info(f"Relation prompt resolution stats: {resolution_stats}") return resolved_relations def resolve_knowledge_graph_content(self, knowledge_graph: Dict[str, Any], original_trace: str, window_metadata: Dict[str, Any]) -> Dict[str, Any]: """ Resolve all ContentReference objects in a knowledge graph to actual content. Args: knowledge_graph: Knowledge graph dictionary containing entities and relations original_trace: Original trace content (without line numbers) window_metadata: Metadata about the window including character positions Returns: Knowledge graph with resolved content references """ if not knowledge_graph or not original_trace: return knowledge_graph resolved_kg = knowledge_graph.copy() # Resolve entity prompts if "entities" in resolved_kg: # Convert dict entities to Entity objects if needed entities = [] for entity_data in resolved_kg["entities"]: if isinstance(entity_data, dict): entity = Entity(**entity_data) else: entity = entity_data entities.append(entity) resolved_entities = self.resolve_entity_prompts(entities, original_trace, window_metadata) # Convert back to dict format resolved_kg["entities"] = [entity.model_dump() for entity in resolved_entities] # Resolve relation prompts if "relations" in resolved_kg: # Convert dict relations to Relation objects if needed relations = [] for relation_data in resolved_kg["relations"]: if isinstance(relation_data, dict): relation = Relation(**relation_data) else: relation = relation_data relations.append(relation) resolved_relations = self.resolve_relation_prompts(relations, original_trace, window_metadata) # Convert back to dict format resolved_kg["relations"] = [relation.model_dump() for relation in resolved_relations] # Add resolution metadata if "metadata" not in resolved_kg: resolved_kg["metadata"] = {} resolved_kg["metadata"]["content_resolution"] = { "resolved_at": self._get_current_timestamp(), "original_trace_length": len(original_trace), "resolution_method": "content_reference_resolver" } logger.info(f"Resolved content references for knowledge graph with {len(resolved_kg.get('entities', []))} entities and {len(resolved_kg.get('relations', []))} relations") return resolved_kg def validate_content_references(self, content_refs: List[ContentReference], original_trace: str) -> Dict[str, Any]: """ Validate a list of ContentReference objects against the original trace. Args: content_refs: List of ContentReference objects to validate original_trace: Original trace content Returns: Validation report dictionary """ if not content_refs or not original_trace: return {"valid_references": 0, "invalid_references": 0, "details": []} # Use extraction-compatible numbering for validation numbered_content = self._create_extraction_compatible_numbering(original_trace) total_lines = len(original_trace.split('\n')) validation_report = { "total_references": len(content_refs), "valid_references": 0, "invalid_references": 0, "details": [] } for i, content_ref in enumerate(content_refs): detail = { "index": i, "content_type": content_ref.content_type, "line_range": f"{content_ref.line_start}-{content_ref.line_end}", "is_valid": True, "issues": [] } # Check line range validity if content_ref.line_start < 1 or content_ref.line_end < 1: detail["is_valid"] = False detail["issues"].append("Line numbers must be >= 1") if content_ref.line_start > total_lines or content_ref.line_end > total_lines: detail["is_valid"] = False detail["issues"].append(f"Line numbers exceed total lines ({total_lines})") if not content_ref.validate_line_range(): detail["is_valid"] = False detail["issues"].append("line_end must be >= line_start") # Try to extract content and validate try: extracted_content, content_valid = self.line_processor.extract_content_by_reference( numbered_content, content_ref ) if not content_valid: detail["is_valid"] = False detail["issues"].append("Content does not match summary") except Exception as e: detail["is_valid"] = False detail["issues"].append(f"Extraction error: {str(e)}") if detail["is_valid"]: validation_report["valid_references"] += 1 else: validation_report["invalid_references"] += 1 validation_report["details"].append(detail) return validation_report def _create_extraction_compatible_numbering(self, original_trace: str) -> str: """ Create numbered content using the same line numbering scheme as extraction. This method replicates the character-to-line mapping logic from ChunkingService to ensure ContentReferences resolve to the correct content. Args: original_trace: Original trace content (without line numbers) Returns: Content with line numbers that match extraction numbering """ # Step 1: Create character-to-line mapping (same as ChunkingService) original_lines = original_trace.split('\n') char_to_line_map = {} char_pos = 0 for line_num, line in enumerate(original_lines, 1): # Map every character in this line to this line number for i in range(len(line) + 1): # +1 for newline if char_pos + i < len(original_trace): char_to_line_map[char_pos + i] = line_num char_pos += len(line) + 1 # +1 for newline # Step 2: Add line numbers to each line using its actual line number numbered_lines = [] for line_num, line in enumerate(original_lines, 1): numbered_line = f" {line}" numbered_lines.append(numbered_line) numbered_content = '\n'.join(numbered_lines) logger.debug(f"Created extraction-compatible numbering for {len(original_lines)} lines") return numbered_content def _get_current_timestamp(self) -> str: """Get current timestamp in ISO format.""" from datetime import datetime return datetime.now().isoformat() def get_resolution_statistics(self, knowledge_graph: Dict[str, Any]) -> Dict[str, Any]: """ Get statistics about content references in a knowledge graph. Args: knowledge_graph: Knowledge graph to analyze Returns: Statistics dictionary """ stats = { "entities": { "total": 0, "with_references": 0, "with_resolved_content": 0 }, "relations": { "total": 0, "with_references": 0, "with_resolved_content": 0 } } # Analyze entities if "entities" in knowledge_graph: stats["entities"]["total"] = len(knowledge_graph["entities"]) for entity_data in knowledge_graph["entities"]: if "raw_prompt_ref" in entity_data and entity_data["raw_prompt_ref"]: stats["entities"]["with_references"] += 1 if "raw_prompt" in entity_data and entity_data["raw_prompt"]: stats["entities"]["with_resolved_content"] += 1 # Analyze relations if "relations" in knowledge_graph: stats["relations"]["total"] = len(knowledge_graph["relations"]) for relation_data in knowledge_graph["relations"]: if "interaction_prompt_ref" in relation_data and relation_data["interaction_prompt_ref"]: stats["relations"]["with_references"] += 1 if "interaction_prompt" in relation_data and relation_data["interaction_prompt"]: stats["relations"]["with_resolved_content"] += 1 return stats