Production Data Engineering Pipeline

A modular, production-ready data engineering pipeline built with Python. Designed for ML data preparation workflows with built-in validation, deduplication, quality filtering, and monitoring.

Architecture

data_pipeline/
β”œβ”€β”€ config.py          # PipelineConfig (env-driven via Pydantic BaseSettings)
β”œβ”€β”€ schemas.py         # Data contracts (Pydantic + Pandera)
β”œβ”€β”€ stages.py          # Pipeline stages (Ingest β†’ Validate β†’ Transform β†’ Dedup β†’ Quality β†’ Load)
β”œβ”€β”€ monitoring.py      # Structured JSON logging + metrics + alerts
β”œβ”€β”€ runner.py          # Pipeline orchestrator with checkpointing
└── main.py            # CLI entry point

Features

Feature Implementation
Schema Validation Pydantic v2 (record-level) + Pandera (DataFrame-level)
Deduplication Exact (SHA-256 hash) + Near-dedup (MinHash LSH)
Quality Filtering Gopher/FineWeb-style heuristics (length, score, repetition)
Retry Logic Tenacity with exponential backoff on I/O operations
Structured Logging JSON-formatted logs for production monitoring
Metrics & Alerts Per-stage metrics with threshold-based alerting
Checkpointing Resume from last successful stage on failure
Multiple Outputs Parquet, JSONL, or direct HF Hub push
12-Factor Config All settings via environment variables

Quick Start

Installation

pip install pydantic pydantic-settings pandera tenacity pandas pyarrow datasets huggingface_hub
# Optional for near-dedup:
pip install datasketch

Run Demo

# Run with generated sample data
python -m data_pipeline.main --demo --demo-size 200

# Run with custom data
python -m data_pipeline.main --source data/raw/ --output data/processed/

# Push to HF Hub
python -m data_pipeline.main --source data/raw/ --output-format hf_hub --hf-repo myorg/clean-dataset

Environment Configuration

export PIPELINE_SOURCE_PATH=data/raw/
export PIPELINE_OUTPUT_PATH=data/processed/
export PIPELINE_OUTPUT_FORMAT=parquet
export PIPELINE_MIN_TEXT_LENGTH=50
export PIPELINE_MIN_QUALITY_SCORE=0.5
export PIPELINE_DEDUP_ENABLED=true
export PIPELINE_LOG_LEVEL=INFO

Usage in Code

from data_pipeline import PipelineConfig, PipelineRunner
import pandas as pd

# Configure
config = PipelineConfig(
    source_path="data/raw/",
    output_path="data/processed/",
    output_format="parquet",
    min_quality_score=0.6,
    dedup_enabled=True,
)

# Run
runner = PipelineRunner(config)
output_data, metrics = runner.run()

# Check results
print(f"Status: {metrics.status}")
print(f"Records: {metrics.total_records_in} β†’ {metrics.total_records_out}")
print(f"Duration: {metrics.total_duration_seconds:.1f}s")

Custom Stages

from data_pipeline.stages import PipelineStage
from data_pipeline.config import PipelineConfig
import pandas as pd

class MyCustomStage(PipelineStage):
    def __init__(self, config: PipelineConfig):
        super().__init__("my_custom_stage", config)

    def process(self, data: pd.DataFrame) -> pd.DataFrame:
        # Your logic here
        data["new_column"] = data["text"].str.len()
        return data

# Add to pipeline
runner = PipelineRunner(config)
runner.stages.insert(3, MyCustomStage(config))  # Insert after transform
output, metrics = runner.run(input_data=df)

Pipeline Stages

1. Ingest

  • Loads data from JSONL, CSV, Parquet, or HF Hub
  • Retry logic on I/O failures (3 attempts, exponential backoff)
  • Supports directory scanning with glob patterns

2. Validate

  • Per-record validation via Pydantic models
  • Control character removal, whitespace stripping
  • Quality gate: rejects short + low-quality records
  • Invalid records logged and dropped (soft failure)

3. Transform

  • Unicode normalization (NFC)
  • Whitespace cleanup
  • Computes: text_length, word_count, content_hash
  • Fills missing quality scores with neutral default

4. Deduplicate

  • Exact dedup: SHA-256 hash β€” O(n), catches identical documents
  • Near-dedup: MinHash LSH β€” catches paraphrases/minor edits
    • Configurable threshold (default: 0.8 Jaccard)
    • 5-gram shingling (proven by FineWeb2)

5. Quality Filter

  • Minimum/maximum text length
  • Quality score threshold
  • Word count minimum
  • Repetition ratio (duplicate line detection)
  • Per-filter impact logging

6. Load

  • Parquet: Snappy-compressed, columnar (best for analytics)
  • JSONL: Human-readable, streaming-friendly
  • HF Hub: Direct push with retry logic

Monitoring

Structured Logs

{
  "timestamp": "2024-01-15T10:30:00Z",
  "level": "INFO",
  "logger": "pipeline.transform",
  "message": "Stage complete",
  "metrics": {
    "stage_name": "transform",
    "records_in": 1000,
    "records_out": 1000,
    "pass_rate": 1.0,
    "duration_seconds": 2.3
  }
}

Alerts

Automatic alerts when:

  • Failure rate > 20% (configurable)
  • Pass rate < 50%
  • Stage takes > 5 minutes

Run Metrics

Full JSON metrics report saved per run at data/metrics/run_<id>.json.

Testing

pip install pytest
pytest tests/ -v

Production Deployment Tips

  1. Use environment variables for all configuration (12-factor)
  2. Monitor the JSON logs with your log aggregator (ELK, CloudWatch, etc.)
  3. Set up alerts on failure_rate and duration thresholds
  4. Enable checkpointing for long-running pipelines (automatic)
  5. Scale dedup with datasketch disk storage for >10M records
  6. Schedule with cron, Airflow, or Prefect for recurring ingestion

License

MIT

Generated by ML Intern

This model repository was generated by ML Intern, an agent for machine learning research and development on the Hugging Face Hub.

Downloads last month

-

Downloads are not tracked for this model. How to track
Inference Providers NEW
This model isn't deployed by any Inference Provider. πŸ™‹ Ask for provider support

Space using Tamal321/production-data-pipeline 1