Spaces:
Configuration error
Configuration error
| """ | |
| Live Document Tracer | |
| Real-time streaming of document-centric provenance events. | |
| This is the LIVE version of what the export system freezes. | |
| Instead of: Model runs β Process β Export frozen provenance | |
| We do: Model runs β STREAM events β View live document highlights | |
| Same data model as the observer/exporter, just streamed in real-time | |
| with document snippet context attached. | |
| Usage: | |
| # Create observer with live streaming | |
| observer = DatasetObserver("my_pipeline") | |
| tracer = LiveDocumentTracer(observer) | |
| # Subscribe to events | |
| tracer.on_event(my_handler) | |
| # Or stream to async consumer | |
| async for event in tracer.stream(): | |
| render_highlight(event) | |
| """ | |
| import asyncio | |
| import json | |
| import time | |
| from dataclasses import dataclass, field | |
| from enum import Enum | |
| from typing import Any, Callable, Dict, Generator, List, Optional, Set, Tuple | |
| from queue import Queue | |
| from threading import Lock | |
| from pathlib import Path | |
| class TraceEventType(Enum): | |
| """Types of document trace events.""" | |
| # Data flow events | |
| DOCUMENT_TOUCHED = "document_touched" # Model accessed this document/record | |
| SPAN_HIGHLIGHTED = "span_highlighted" # Specific text span being processed | |
| ASSOCIATION_CREATED = "association_created" # Link between two spans/documents | |
| # Activity events | |
| ACTIVITY_STARTED = "activity_started" | |
| ACTIVITY_PROGRESS = "activity_progress" | |
| ACTIVITY_COMPLETED = "activity_completed" | |
| # Entity events | |
| ENTITY_CREATED = "entity_created" | |
| ENTITY_DERIVED = "entity_derived" | |
| # Relationship events | |
| LINK_CREATED = "link_created" | |
| class DocumentSpan: | |
| """ | |
| A span within a document being traced. | |
| This is the atomic unit of live visualization - | |
| the specific text/content the model is touching. | |
| """ | |
| document_id: str # Entity or record ID | |
| document_name: str # Human-readable name | |
| field_name: str = "" # Column/field if applicable | |
| row_index: int = -1 # Row if applicable | |
| # The actual content span | |
| text: str = "" # The snippet text | |
| start_char: int = -1 # Start position in full text | |
| end_char: int = -1 # End position in full text | |
| # Visual hints | |
| highlight_type: str = "default" # "source", "target", "match", "attention" | |
| confidence: float = 1.0 # For attention/relevance visualization | |
| # Metadata | |
| metadata: Dict[str, Any] = field(default_factory=dict) | |
| def to_dict(self) -> Dict[str, Any]: | |
| return { | |
| "document_id": self.document_id, | |
| "document_name": self.document_name, | |
| "field_name": self.field_name, | |
| "row_index": self.row_index, | |
| "text": self.text, | |
| "start_char": self.start_char, | |
| "end_char": self.end_char, | |
| "highlight_type": self.highlight_type, | |
| "confidence": self.confidence, | |
| "metadata": self.metadata, | |
| } | |
| class DocumentAssociation: | |
| """ | |
| An association between two document spans. | |
| Represents the model saying "this connects to that". | |
| """ | |
| source: DocumentSpan | |
| target: DocumentSpan | |
| association_type: str = "related" # "match", "derived", "similar", "references" | |
| confidence: float = 1.0 | |
| # Why this association was made | |
| reason: str = "" | |
| def to_dict(self) -> Dict[str, Any]: | |
| return { | |
| "source": self.source.to_dict(), | |
| "target": self.target.to_dict(), | |
| "association_type": self.association_type, | |
| "confidence": self.confidence, | |
| "reason": self.reason, | |
| } | |
| class TraceEvent: | |
| """ | |
| A single trace event for live document visualization. | |
| This is what gets streamed to the UI in real-time. | |
| """ | |
| event_type: TraceEventType | |
| timestamp: float = field(default_factory=time.time) | |
| # Activity context | |
| activity_id: Optional[str] = None | |
| activity_name: Optional[str] = None | |
| activity_type: Optional[str] = None | |
| # Document spans involved | |
| spans: List[DocumentSpan] = field(default_factory=list) | |
| # Association if this event creates one | |
| association: Optional[DocumentAssociation] = None | |
| # Progress for long operations | |
| progress: Optional[float] = None # 0.0 to 1.0 | |
| progress_message: Optional[str] = None | |
| # Raw provenance data (for export compatibility) | |
| entity_id: Optional[str] = None | |
| relationship_type: Optional[str] = None | |
| # Metadata | |
| metadata: Dict[str, Any] = field(default_factory=dict) | |
| def to_dict(self) -> Dict[str, Any]: | |
| return { | |
| "event_type": self.event_type.value, | |
| "timestamp": self.timestamp, | |
| "activity_id": self.activity_id, | |
| "activity_name": self.activity_name, | |
| "activity_type": self.activity_type, | |
| "spans": [s.to_dict() for s in self.spans], | |
| "association": self.association.to_dict() if self.association else None, | |
| "progress": self.progress, | |
| "progress_message": self.progress_message, | |
| "entity_id": self.entity_id, | |
| "metadata": self.metadata, | |
| } | |
| def to_json(self) -> str: | |
| return json.dumps(self.to_dict(), default=str) | |
| class LiveDocumentTracer: | |
| """ | |
| Real-time document tracing for live visualization. | |
| Hooks into DatasetObserver to stream events as they happen, | |
| enriched with document snippet context for visualization. | |
| This is the LIVE version of what CroissantExporter freezes. | |
| NEW: Now writes all events to a tape file (JSONL) for buffered playback! | |
| """ | |
| def __init__(self, observer=None, buffer_size: int = 1000, log_dir: str = "./logs"): | |
| """ | |
| Initialize tracer. | |
| Args: | |
| observer: DatasetObserver to hook into (optional) | |
| buffer_size: Max events to buffer for replay | |
| log_dir: Directory for tape files (JSONL logs) | |
| """ | |
| self.observer = observer | |
| self.buffer_size = buffer_size | |
| # Event subscribers | |
| self._handlers: List[Callable[[TraceEvent], None]] = [] | |
| self._async_handlers: List[Callable[[TraceEvent], Any]] = [] | |
| # Event buffer for replay/late subscribers | |
| self._buffer: List[TraceEvent] = [] | |
| self._buffer_lock = Lock() | |
| # Async queue for streaming | |
| self._async_queue: Optional[asyncio.Queue] = None | |
| # Current activity context | |
| self._current_activity_id: Optional[str] = None | |
| self._current_activity_name: Optional[str] = None | |
| self._current_activity_type: Optional[str] = None | |
| # Document context cache | |
| self._document_cache: Dict[str, Dict[str, Any]] = {} | |
| # === TAPE FILE FOR PLAYBACK === | |
| self._log_dir = Path(log_dir) | |
| self._log_dir.mkdir(parents=True, exist_ok=True) | |
| self._session_id = int(time.time()) | |
| self._tape_path = self._log_dir / f"unity_tape_{self._session_id}.jsonl" | |
| self._tape_file = None | |
| self._tape_lock = Lock() | |
| self._event_count = 0 | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SUBSCRIPTION | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def on_event(self, handler: Callable[[TraceEvent], None]): | |
| """Subscribe to trace events (sync handler).""" | |
| self._handlers.append(handler) | |
| return self # Allow chaining | |
| def on_event_async(self, handler: Callable[[TraceEvent], Any]): | |
| """Subscribe to trace events (async handler).""" | |
| self._async_handlers.append(handler) | |
| return self | |
| def remove_handler(self, handler): | |
| """Unsubscribe a handler.""" | |
| if handler in self._handlers: | |
| self._handlers.remove(handler) | |
| if handler in self._async_handlers: | |
| self._async_handlers.remove(handler) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # EVENT EMISSION | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def emit(self, event: TraceEvent): | |
| """ | |
| Emit a trace event to all subscribers. | |
| Called internally when provenance events occur. | |
| Also writes to tape file for buffered playback! | |
| """ | |
| self._event_count += 1 | |
| # Add to buffer | |
| with self._buffer_lock: | |
| self._buffer.append(event) | |
| if len(self._buffer) > self.buffer_size: | |
| self._buffer.pop(0) | |
| # === WRITE TO TAPE (JSONL) === | |
| self._write_to_tape(event) | |
| # Call sync handlers | |
| for handler in self._handlers: | |
| try: | |
| handler(event) | |
| except Exception as e: | |
| print(f"Handler error: {e}") | |
| # Queue for async handlers | |
| if self._async_queue: | |
| try: | |
| self._async_queue.put_nowait(event) | |
| except asyncio.QueueFull: | |
| pass # Drop if queue full | |
| def _write_to_tape(self, event: TraceEvent): | |
| """Write event to tape file for later playback.""" | |
| try: | |
| with self._tape_lock: | |
| # Lazy open the file | |
| if self._tape_file is None: | |
| self._tape_file = open(self._tape_path, "a", encoding="utf-8") | |
| print(f"[CASCADE] πΌ Unity tape started: {self._tape_path}") | |
| # Build tape record with full context | |
| record = { | |
| "seq": self._event_count, | |
| "event": event.to_dict(), | |
| "session_id": self._session_id, | |
| } | |
| json_line = json.dumps(record, default=str) + "\n" | |
| self._tape_file.write(json_line) | |
| self._tape_file.flush() | |
| # Debug: Log first few events | |
| if self._event_count <= 3: | |
| print(f"[CASCADE] π Wrote event {self._event_count} to tape: {event.event_type}") | |
| except Exception as e: | |
| # Don't let tape errors break the main flow | |
| print(f"[CASCADE] β οΈ Tape write error: {e}") | |
| pass | |
| def _write_raw_to_tape(self, record: Dict[str, Any]): | |
| """Write a raw record to tape file (for docspace events).""" | |
| try: | |
| with self._tape_lock: | |
| # Lazy open the file | |
| if self._tape_file is None: | |
| self._tape_file = open(self._tape_path, "a", encoding="utf-8") | |
| print(f"[CASCADE] πΌ Unity tape started: {self._tape_path}") | |
| self._tape_file.write(json.dumps(record, default=str) + "\n") | |
| self._tape_file.flush() | |
| except Exception: | |
| pass | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # DOCUMENT SPACE EVENTS (for polling iframe) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def emit_entity(self, entity_id: str, source: str, text: str, index: int, side: str = "a"): | |
| """ | |
| Emit an entity for Document Space visualization. | |
| Args: | |
| entity_id: Unique ID for the entity | |
| source: Source dataset name | |
| text: Preview text (truncated) | |
| index: Row index in dataset | |
| side: "a" or "b" to indicate which dataset | |
| """ | |
| self._event_count += 1 | |
| record = { | |
| "seq": self._event_count, | |
| "type": "docspace_entity", | |
| "side": side, | |
| "data": { | |
| "id": entity_id, | |
| "source": source, | |
| "text": text[:200], | |
| "index": index, | |
| }, | |
| "session_id": self._session_id, | |
| } | |
| self._write_raw_to_tape(record) | |
| def emit_match(self, doc_a_id: str, doc_b_id: str, score: float): | |
| """ | |
| Emit a match for Document Space visualization. | |
| Args: | |
| doc_a_id: ID of entity from dataset A | |
| doc_b_id: ID of entity from dataset B | |
| score: Similarity score (0-1) | |
| """ | |
| self._event_count += 1 | |
| record = { | |
| "seq": self._event_count, | |
| "type": "docspace_match", | |
| "data": { | |
| "docA": doc_a_id, | |
| "docB": doc_b_id, | |
| "score": float(score), | |
| }, | |
| "session_id": self._session_id, | |
| } | |
| self._write_raw_to_tape(record) | |
| def emit_phase(self, phase: str, progress: float, message: str = ""): | |
| """ | |
| Emit a phase update for Document Space. | |
| Args: | |
| phase: Current phase (embedding_a, embedding_b, comparing, complete) | |
| progress: Progress 0-1 | |
| message: Status message | |
| """ | |
| self._event_count += 1 | |
| record = { | |
| "seq": self._event_count, | |
| "type": "docspace_phase", | |
| "data": { | |
| "phase": phase, | |
| "progress": float(progress), | |
| "message": message, | |
| }, | |
| "session_id": self._session_id, | |
| } | |
| self._write_raw_to_tape(record) | |
| def close_tape(self): | |
| """Close the tape file (call when session ends).""" | |
| with self._tape_lock: | |
| if self._tape_file: | |
| self._tape_file.close() | |
| self._tape_file = None | |
| print(f"[CASCADE] πΌ Unity tape closed: {self._event_count} events β {self._tape_path}") | |
| def get_tape_path(self) -> Optional[Path]: | |
| """Get the path to the current tape file (whether open or not).""" | |
| return self._tape_path | |
| def load_tape(tape_path: str) -> List[Dict[str, Any]]: | |
| """ | |
| Load events from a tape file for playback. | |
| Args: | |
| tape_path: Path to the .jsonl tape file | |
| Returns: | |
| List of event records in chronological order | |
| """ | |
| events = [] | |
| with open(tape_path, "r", encoding="utf-8") as f: | |
| for line in f: | |
| line = line.strip() | |
| if line: | |
| try: | |
| events.append(json.loads(line)) | |
| except json.JSONDecodeError: | |
| pass # Skip malformed lines | |
| return events | |
| async def stream(self) -> Generator[TraceEvent, None, None]: | |
| """ | |
| Async generator for streaming events. | |
| Usage: | |
| async for event in tracer.stream(): | |
| await render(event) | |
| """ | |
| self._async_queue = asyncio.Queue(maxsize=self.buffer_size) | |
| # Replay buffer first | |
| with self._buffer_lock: | |
| for event in self._buffer: | |
| yield event | |
| # Then stream new events | |
| while True: | |
| event = await self._async_queue.get() | |
| yield event | |
| def get_buffer(self) -> List[TraceEvent]: | |
| """Get buffered events for replay.""" | |
| with self._buffer_lock: | |
| return list(self._buffer) | |
| def clear_buffer(self): | |
| """Clear the event buffer.""" | |
| with self._buffer_lock: | |
| self._buffer.clear() | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TRACING API - Call these to emit events | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def start_activity( | |
| self, | |
| activity_id: str, | |
| activity_name: str, | |
| activity_type: str = "transform", | |
| ): | |
| """Signal start of an activity (for context).""" | |
| self._current_activity_id = activity_id | |
| self._current_activity_name = activity_name | |
| self._current_activity_type = activity_type | |
| self.emit(TraceEvent( | |
| event_type=TraceEventType.ACTIVITY_STARTED, | |
| activity_id=activity_id, | |
| activity_name=activity_name, | |
| activity_type=activity_type, | |
| )) | |
| def end_activity(self, activity_id: str = None): | |
| """Signal end of an activity.""" | |
| self.emit(TraceEvent( | |
| event_type=TraceEventType.ACTIVITY_COMPLETED, | |
| activity_id=activity_id or self._current_activity_id, | |
| activity_name=self._current_activity_name, | |
| activity_type=self._current_activity_type, | |
| )) | |
| self._current_activity_id = None | |
| self._current_activity_name = None | |
| self._current_activity_type = None | |
| def report_progress( | |
| self, | |
| progress: float, | |
| message: str = "", | |
| activity_id: str = None, | |
| ): | |
| """Report progress on current activity.""" | |
| self.emit(TraceEvent( | |
| event_type=TraceEventType.ACTIVITY_PROGRESS, | |
| activity_id=activity_id or self._current_activity_id, | |
| activity_name=self._current_activity_name, | |
| progress=progress, | |
| progress_message=message, | |
| )) | |
| def touch_document( | |
| self, | |
| document_id: str, | |
| document_name: str, | |
| snippet: str = "", | |
| field_name: str = "", | |
| row_index: int = -1, | |
| highlight_type: str = "default", | |
| confidence: float = 1.0, | |
| **metadata, | |
| ): | |
| """ | |
| Signal that the model touched a document/record. | |
| This creates a highlight in the live view. | |
| """ | |
| span = DocumentSpan( | |
| document_id=document_id, | |
| document_name=document_name, | |
| field_name=field_name, | |
| row_index=row_index, | |
| text=snippet, | |
| highlight_type=highlight_type, | |
| confidence=confidence, | |
| metadata=metadata, | |
| ) | |
| self.emit(TraceEvent( | |
| event_type=TraceEventType.DOCUMENT_TOUCHED, | |
| activity_id=self._current_activity_id, | |
| activity_name=self._current_activity_name, | |
| activity_type=self._current_activity_type, | |
| spans=[span], | |
| entity_id=document_id, | |
| metadata=metadata, | |
| )) | |
| return span | |
| def highlight_span( | |
| self, | |
| document_id: str, | |
| document_name: str, | |
| text: str, | |
| start_char: int = -1, | |
| end_char: int = -1, | |
| field_name: str = "", | |
| row_index: int = -1, | |
| highlight_type: str = "attention", | |
| confidence: float = 1.0, | |
| **metadata, | |
| ): | |
| """ | |
| Highlight a specific span within a document. | |
| For showing exactly where in the text the model is focusing. | |
| """ | |
| span = DocumentSpan( | |
| document_id=document_id, | |
| document_name=document_name, | |
| field_name=field_name, | |
| row_index=row_index, | |
| text=text, | |
| start_char=start_char, | |
| end_char=end_char, | |
| highlight_type=highlight_type, | |
| confidence=confidence, | |
| metadata=metadata, | |
| ) | |
| self.emit(TraceEvent( | |
| event_type=TraceEventType.SPAN_HIGHLIGHTED, | |
| activity_id=self._current_activity_id, | |
| activity_name=self._current_activity_name, | |
| activity_type=self._current_activity_type, | |
| spans=[span], | |
| metadata=metadata, | |
| )) | |
| return span | |
| def create_association( | |
| self, | |
| source_doc_id: str, | |
| source_doc_name: str, | |
| source_text: str, | |
| target_doc_id: str, | |
| target_doc_name: str, | |
| target_text: str, | |
| association_type: str = "related", | |
| confidence: float = 1.0, | |
| reason: str = "", | |
| **metadata, | |
| ): | |
| """ | |
| Create an association between two document spans. | |
| This is the "A connects to B" visualization. | |
| """ | |
| source = DocumentSpan( | |
| document_id=source_doc_id, | |
| document_name=source_doc_name, | |
| text=source_text, | |
| highlight_type="source", | |
| confidence=confidence, | |
| ) | |
| target = DocumentSpan( | |
| document_id=target_doc_id, | |
| document_name=target_doc_name, | |
| text=target_text, | |
| highlight_type="target", | |
| confidence=confidence, | |
| ) | |
| association = DocumentAssociation( | |
| source=source, | |
| target=target, | |
| association_type=association_type, | |
| confidence=confidence, | |
| reason=reason, | |
| ) | |
| self.emit(TraceEvent( | |
| event_type=TraceEventType.ASSOCIATION_CREATED, | |
| activity_id=self._current_activity_id, | |
| activity_name=self._current_activity_name, | |
| activity_type=self._current_activity_type, | |
| spans=[source, target], | |
| association=association, | |
| metadata=metadata, | |
| )) | |
| return association | |
| def entity_created( | |
| self, | |
| entity_id: str, | |
| entity_name: str, | |
| record_count: int = None, | |
| **metadata, | |
| ): | |
| """Signal that a new entity was created in provenance.""" | |
| self.emit(TraceEvent( | |
| event_type=TraceEventType.ENTITY_CREATED, | |
| activity_id=self._current_activity_id, | |
| activity_name=self._current_activity_name, | |
| entity_id=entity_id, | |
| metadata={"name": entity_name, "record_count": record_count, **metadata}, | |
| )) | |
| def entity_derived( | |
| self, | |
| derived_id: str, | |
| derived_name: str, | |
| source_ids: List[str], | |
| **metadata, | |
| ): | |
| """Signal that an entity was derived from others.""" | |
| self.emit(TraceEvent( | |
| event_type=TraceEventType.ENTITY_DERIVED, | |
| activity_id=self._current_activity_id, | |
| activity_name=self._current_activity_name, | |
| entity_id=derived_id, | |
| metadata={"name": derived_name, "sources": source_ids, **metadata}, | |
| )) | |
| def link_created( | |
| self, | |
| source_id: str, | |
| target_id: str, | |
| relationship_type: str, | |
| **metadata, | |
| ): | |
| """Signal that a provenance link was created.""" | |
| self.emit(TraceEvent( | |
| event_type=TraceEventType.LINK_CREATED, | |
| activity_id=self._current_activity_id, | |
| activity_name=self._current_activity_name, | |
| relationship_type=relationship_type, | |
| metadata={"source": source_id, "target": target_id, **metadata}, | |
| )) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # EXPORT (Freeze the live state) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def export_session(self) -> Dict[str, Any]: | |
| """ | |
| Export the trace session as frozen data. | |
| This is the bridge between live and export - | |
| same data, just frozen at a point in time. | |
| """ | |
| with self._buffer_lock: | |
| return { | |
| "events": [e.to_dict() for e in self._buffer], | |
| "event_count": len(self._buffer), | |
| "exported_at": time.time(), | |
| } | |
| def export_associations(self) -> List[Dict[str, Any]]: | |
| """Export just the associations for visualization.""" | |
| associations = [] | |
| with self._buffer_lock: | |
| for event in self._buffer: | |
| if event.association: | |
| associations.append(event.association.to_dict()) | |
| return associations | |
| def export_timeline(self) -> List[Dict[str, Any]]: | |
| """Export events as a timeline.""" | |
| timeline = [] | |
| with self._buffer_lock: | |
| for event in self._buffer: | |
| timeline.append({ | |
| "timestamp": event.timestamp, | |
| "type": event.event_type.value, | |
| "activity": event.activity_name, | |
| "spans": len(event.spans), | |
| "has_association": event.association is not None, | |
| }) | |
| return timeline | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CONSOLE RENDERER - Simple text-based live view | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ConsoleTraceRenderer: | |
| """ | |
| Simple console renderer for live document traces. | |
| Good for debugging and terminal-based workflows. | |
| """ | |
| def __init__(self, show_snippets: bool = True, max_snippet_len: int = 80): | |
| self.show_snippets = show_snippets | |
| self.max_snippet_len = max_snippet_len | |
| def render(self, event: TraceEvent): | |
| """Render event to console.""" | |
| timestamp = time.strftime("%H:%M:%S", time.localtime(event.timestamp)) | |
| if event.event_type == TraceEventType.ACTIVITY_STARTED: | |
| print(f"\n[{timestamp}] βΆ {event.activity_name} ({event.activity_type})") | |
| print("β" * 60) | |
| elif event.event_type == TraceEventType.ACTIVITY_COMPLETED: | |
| print("β" * 60) | |
| print(f"[{timestamp}] β {event.activity_name} completed") | |
| elif event.event_type == TraceEventType.ACTIVITY_PROGRESS: | |
| pct = int((event.progress or 0) * 100) | |
| bar = "β" * (pct // 5) + "β" * (20 - pct // 5) | |
| msg = event.progress_message or "" | |
| print(f"\r[{timestamp}] [{bar}] {pct}% {msg}", end="", flush=True) | |
| if pct >= 100: | |
| print() | |
| elif event.event_type == TraceEventType.DOCUMENT_TOUCHED: | |
| for span in event.spans: | |
| snippet = self._truncate(span.text) | |
| print(f"[{timestamp}] π {span.document_name}", end="") | |
| if span.field_name: | |
| print(f"[{span.field_name}]", end="") | |
| if span.row_index >= 0: | |
| print(f" row={span.row_index}", end="") | |
| if self.show_snippets and snippet: | |
| print(f"\n ββ \"{snippet}\"") | |
| else: | |
| print() | |
| elif event.event_type == TraceEventType.SPAN_HIGHLIGHTED: | |
| for span in event.spans: | |
| snippet = self._truncate(span.text) | |
| conf = f"{span.confidence:.0%}" if span.confidence < 1.0 else "" | |
| print(f"[{timestamp}] π [{span.highlight_type}] {conf}") | |
| if self.show_snippets and snippet: | |
| print(f" ββ \"{snippet}\"") | |
| elif event.event_type == TraceEventType.ASSOCIATION_CREATED: | |
| assoc = event.association | |
| if assoc: | |
| src = self._truncate(assoc.source.text, 40) | |
| tgt = self._truncate(assoc.target.text, 40) | |
| print(f"[{timestamp}] π {assoc.association_type} ({assoc.confidence:.0%})") | |
| print(f" ββ \"{src}\"") | |
| print(f" ββ \"{tgt}\"") | |
| if assoc.reason: | |
| print(f" ({assoc.reason})") | |
| elif event.event_type == TraceEventType.ENTITY_CREATED: | |
| name = event.metadata.get("name", event.entity_id) | |
| count = event.metadata.get("record_count", "?") | |
| print(f"[{timestamp}] β¦ Entity created: {name} ({count} records)") | |
| elif event.event_type == TraceEventType.ENTITY_DERIVED: | |
| name = event.metadata.get("name", event.entity_id) | |
| sources = event.metadata.get("sources", []) | |
| print(f"[{timestamp}] ‡ Entity derived: {name} β {len(sources)} sources") | |
| def _truncate(self, text: str, max_len: int = None) -> str: | |
| max_len = max_len or self.max_snippet_len | |
| if not text: | |
| return "" | |
| text = text.replace("\n", " ").strip() | |
| if len(text) > max_len: | |
| return text[:max_len-3] + "..." | |
| return text | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CONVENIENCE | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def create_live_tracer(observer=None, console: bool = False) -> LiveDocumentTracer: | |
| """ | |
| Create a live document tracer. | |
| Args: | |
| observer: DatasetObserver to hook into | |
| console: If True, attach console renderer | |
| Returns: | |
| Configured LiveDocumentTracer | |
| """ | |
| tracer = LiveDocumentTracer(observer) | |
| if console: | |
| renderer = ConsoleTraceRenderer() | |
| tracer.on_event(renderer.render) | |
| return tracer | |