""" Semi-Automated Labeling Pipeline. Production-grade labeling system with regex auto-extraction, confidence scoring, and interactive review capabilities. Features: - Auto-extract entities using patterns - Confidence-based auto-verification - Interactive CLI for review - Batch processing - Export to training format Example: >>> from src.data.labeling import LabelingPipeline >>> pipeline = LabelingPipeline() >>> >>> # Add raw text >>> pipeline.add_text("Rs.500 debited from account 1234") >>> >>> # Review pending >>> pipeline.interactive_review() >>> >>> # Export >>> pipeline.export_training_data("data/training/labeled.jsonl") Author: Ranjit Behera License: MIT """ from __future__ import annotations import json import logging from dataclasses import dataclass, field, asdict from datetime import datetime from pathlib import Path from typing import ( Any, Callable, Dict, Generator, List, Optional, Tuple, Union, ) from src.data.extractor import EntityExtractor, FinancialEntity # Configure logging logger = logging.getLogger(__name__) @dataclass class LabeledExample: """ A labeled training example with verification status. Attributes: id: Unique example identifier. source: Data source (email, pdf, manual). raw_text: Original text content. subject: Email subject or generated title. entities: Extracted/labeled entities. verified: Whether human verified. confidence: Auto-extraction confidence. created_at: Creation timestamp. verified_at: Verification timestamp. notes: Optional review notes. """ id: int source: str raw_text: str subject: str entities: Dict[str, Any] verified: bool = False confidence: float = 0.0 created_at: str = field(default_factory=lambda: datetime.now().isoformat()) verified_at: Optional[str] = None notes: Optional[str] = None def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" return {k: v for k, v in asdict(self).items() if v is not None} def to_training_format(self) -> Dict[str, str]: """Convert to training JSONL format.""" prompt = ( f"Extract financial entities from this email:\n\n" f"Subject: {self.subject}\n\n" f"Body: {self.raw_text}" ) completion = json.dumps(self.entities, indent=2) return {"prompt": prompt, "completion": completion} def display(self) -> str: """Return formatted display string.""" lines = [ f"ID: {self.id} | Confidence: {self.confidence:.0%} | Verified: {self.verified}", f"Source: {self.source}", f"", f"šŸ“§ Subject: {self.subject}", f"", f"šŸ“ Text:", f" {self.raw_text[:300]}{'...' if len(self.raw_text) > 300 else ''}", f"", f"šŸ” Entities:", ] for k, v in self.entities.items(): lines.append(f" {k}: {v}") return "\n".join(lines) class LabelingPipeline: """ Semi-automated labeling pipeline with interactive review. This pipeline automates entity extraction using regex patterns and allows human verification for low-confidence extractions. Workflow: 1. Add raw text/emails/PDFs 2. Auto-extract entities with confidence scoring 3. High confidence (>80%) auto-verified 4. Low confidence flagged for review 5. Interactive CLI for human verification 6. Export verified data to training format Attributes: data_dir: Directory for storing labeled data. extractor: EntityExtractor instance. examples: List of labeled examples. Example: >>> pipeline = LabelingPipeline("data/labeling") >>> >>> # Add samples >>> pipeline.add_text( ... "Rs.500 debited from A/c 1234 on 01-01-26", ... source="email" ... ) >>> >>> # Check status >>> print(pipeline.get_stats()) >>> >>> # Review pending >>> pipeline.interactive_review() """ # Confidence thresholds AUTO_VERIFY_THRESHOLD: float = 0.8 HIGH_PRIORITY_THRESHOLD: float = 0.5 def __init__( self, data_dir: Union[str, Path] = "data/labeling", auto_load: bool = True ) -> None: """ Initialize the labeling pipeline. Args: data_dir: Directory for labeled data storage. auto_load: Load existing data on init. """ self.data_dir = Path(data_dir) self.data_dir.mkdir(parents=True, exist_ok=True) self.extractor = EntityExtractor() self.examples: List[LabeledExample] = [] self._next_id = 1 # File paths self.data_file = self.data_dir / "labeled_data.json" self.export_dir = self.data_dir / "exports" self.export_dir.mkdir(exist_ok=True) if auto_load: self._load() logger.info(f"LabelingPipeline initialized: {self.data_dir}") def _load(self) -> None: """Load existing labeled data.""" if self.data_file.exists(): try: with open(self.data_file) as f: data = json.load(f) for item in data.get("examples", []): self.examples.append(LabeledExample(**item)) if self.examples: self._next_id = max(e.id for e in self.examples) + 1 logger.info(f"Loaded {len(self.examples)} examples") except Exception as e: logger.warning(f"Failed to load data: {e}") def _save(self) -> None: """Save labeled data to file.""" data = { "examples": [e.to_dict() for e in self.examples], "stats": self.get_stats(), "last_updated": datetime.now().isoformat(), } with open(self.data_file, "w") as f: json.dump(data, f, indent=2) logger.debug(f"Saved {len(self.examples)} examples") def add_text( self, text: str, source: str = "manual", subject: Optional[str] = None, auto_save: bool = True ) -> LabeledExample: """ Add raw text for labeling. Text is auto-processed to extract entities and assigned a confidence score. Args: text: Raw transaction text. source: Source identifier (email, pdf, manual). subject: Email subject (auto-generated if None). auto_save: Save after adding. Returns: LabeledExample: The created example. """ # Extract entities result = self.extractor.extract(text) entities = result.to_dict() confidence = result.confidence_score() # Generate subject if not provided if not subject: txn_type = entities.get("type", "Transaction").capitalize() amount = entities.get("amount", "") subject = f"{txn_type} Alert - Rs.{amount}" example = LabeledExample( id=self._next_id, source=source, raw_text=text.strip(), subject=subject, entities=entities, verified=confidence >= self.AUTO_VERIFY_THRESHOLD, confidence=confidence, ) self.examples.append(example) self._next_id += 1 if auto_save: self._save() logger.debug(f"Added example {example.id} (confidence: {confidence:.0%})") return example def add_batch( self, items: List[Dict[str, str]], source: str = "batch" ) -> int: """ Add multiple items for labeling. Args: items: List of dicts with 'text' and optional 'subject'. source: Source identifier. Returns: Number of items added. """ added = 0 for item in items: text = item.get("text", "").strip() if text: self.add_text( text=text, source=source, subject=item.get("subject"), auto_save=False ) added += 1 self._save() logger.info(f"Added {added} examples from batch") return added def get_pending_review(self) -> List[LabeledExample]: """Get examples that need human review.""" return [e for e in self.examples if not e.verified] def get_high_priority_review(self) -> List[LabeledExample]: """Get low-confidence examples needing review.""" return [ e for e in self.examples if not e.verified and e.confidence < self.HIGH_PRIORITY_THRESHOLD ] def verify( self, example_id: int, corrected_entities: Optional[Dict[str, Any]] = None, notes: Optional[str] = None ) -> bool: """ Verify or correct an example. Args: example_id: Example ID to verify. corrected_entities: Optional corrected entities. notes: Optional notes about the verification. Returns: True if example found and verified. """ for example in self.examples: if example.id == example_id: if corrected_entities: example.entities = corrected_entities example.verified = True example.confidence = 1.0 example.verified_at = datetime.now().isoformat() if notes: example.notes = notes self._save() logger.info(f"Verified example {example_id}") return True return False def reject(self, example_id: int) -> bool: """ Reject and remove an example. Args: example_id: Example to reject. Returns: True if found and removed. """ for i, example in enumerate(self.examples): if example.id == example_id: del self.examples[i] self._save() logger.info(f"Rejected example {example_id}") return True return False def get_stats(self) -> Dict[str, Any]: """Get labeling statistics.""" total = len(self.examples) verified = sum(1 for e in self.examples if e.verified) by_source: Dict[str, int] = {} for e in self.examples: by_source[e.source] = by_source.get(e.source, 0) + 1 return { "total": total, "verified": verified, "pending": total - verified, "auto_verified": sum( 1 for e in self.examples if e.verified and e.verified_at is None ), "human_verified": sum( 1 for e in self.examples if e.verified and e.verified_at is not None ), "avg_confidence": ( sum(e.confidence for e in self.examples) / total if total else 0 ), "by_source": by_source, } def export_training_data( self, output_name: str = "labeled", verified_only: bool = True, train_split: float = 0.9 ) -> Tuple[Optional[Path], Optional[Path]]: """ Export to training format (JSONL). Args: output_name: Base name for output files. verified_only: Only export verified examples. train_split: Train/validation split ratio. Returns: Tuple of (train_path, valid_path). """ data = self.examples if verified_only: data = [e for e in data if e.verified] if not data: logger.warning("No data to export") return None, None import random random.shuffle(data) split_idx = int(len(data) * train_split) train_data = data[:split_idx] valid_data = data[split_idx:] train_file = self.export_dir / f"{output_name}_train.jsonl" valid_file = self.export_dir / f"{output_name}_valid.jsonl" for dataset, filepath in [(train_data, train_file), (valid_data, valid_file)]: with open(filepath, "w") as f: for example in dataset: f.write(json.dumps(example.to_training_format()) + "\n") logger.info(f"Exported: {len(train_data)} train, {len(valid_data)} valid") return train_file, valid_file def interactive_review(self, limit: Optional[int] = None) -> int: """ Interactive CLI for reviewing pending examples. Args: limit: Maximum examples to review. Returns: Number of examples reviewed. """ pending = self.get_pending_review() if not pending: print("\nāœ… All examples are verified! Nothing to review.") return 0 if limit: pending = pending[:limit] print(f"\nšŸ“‹ Interactive Review Mode") print(f" {len(pending)} examples pending review") print("\nCommands:") print(" [y] Verify as correct") print(" [e] Edit entities") print(" [r] Reject/remove") print(" [s] Skip") print(" [q] Quit") print("-" * 50) reviewed = 0 for i, example in enumerate(pending): print(f"\n[{i+1}/{len(pending)}]") print("=" * 50) print(example.display()) print("=" * 50) while True: choice = input("\nAction [y/e/r/s/q]: ").strip().lower() if choice == 'y': self.verify(example.id) print("āœ… Verified") reviewed += 1 break elif choice == 'e': print("\nEnter corrected entities as JSON:") print("Example: {\"amount\": \"500\", \"type\": \"debit\"}") try: json_input = input("> ").strip() if json_input: corrected = json.loads(json_input) self.verify(example.id, corrected_entities=corrected) print("āœ… Corrected and verified") reviewed += 1 except json.JSONDecodeError: print("āŒ Invalid JSON, skipping") break elif choice == 'r': self.reject(example.id) print("šŸ—‘ļø Rejected") reviewed += 1 break elif choice == 's': print("ā­ļø Skipped") break elif choice == 'q': print(f"\nšŸ’¾ Saved. Reviewed {reviewed} examples.") return reviewed else: print("Invalid choice, try again") print(f"\nāœ… Review complete! {reviewed} examples processed.") return reviewed def print_summary(self) -> None: """Print labeling summary.""" stats = self.get_stats() print("\n" + "=" * 50) print("šŸ“Š Labeling Pipeline Summary") print("=" * 50) print(f"Total Examples: {stats['total']}") print(f"Verified: {stats['verified']} ({stats['verified']/stats['total']*100:.0f}%)" if stats['total'] else "Verified: 0") print(f" Auto-verified: {stats['auto_verified']}") print(f" Human-verified: {stats['human_verified']}") print(f"Pending Review: {stats['pending']}") print(f"Avg Confidence: {stats['avg_confidence']:.0%}") print("\nBy Source:") for source, count in stats['by_source'].items(): print(f" {source:15} {count}") print("=" * 50) if __name__ == "__main__": # Demo usage pipeline = LabelingPipeline() # Add sample transactions samples = [ "HDFC Bank: Rs.2500.00 debited from A/c **3545 on 05-01-26 to VPA swiggy@ybl. Ref: 123456", "Dear Customer, INR 45000 credited to A/c 7890 on 04-01-26. Salary from ACME Corp.", "SBI: Rs.1500 debited from a/c XX1234 on 03-01-26. UPI txn to amazon@apl. Ref: 987654", "You paid Rs.599 to Uber from HDFC Bank. Txn ID: 456789.", ] print("Adding sample transactions...") for sample in samples: example = pipeline.add_text(sample, source="demo") print(f" Added #{example.id}: confidence={example.confidence:.0%}") # Show summary pipeline.print_summary() # Export train, valid = pipeline.export_training_data() if train: print(f"\nExported to: {train}")