Production Data Engineering Pipeline
A modular, production-ready data engineering pipeline built with Python. Designed for ML data preparation workflows with built-in validation, deduplication, quality filtering, and monitoring.
Architecture
data_pipeline/
βββ config.py # PipelineConfig (env-driven via Pydantic BaseSettings)
βββ schemas.py # Data contracts (Pydantic + Pandera)
βββ stages.py # Pipeline stages (Ingest β Validate β Transform β Dedup β Quality β Load)
βββ monitoring.py # Structured JSON logging + metrics + alerts
βββ runner.py # Pipeline orchestrator with checkpointing
βββ main.py # CLI entry point
Features
| Feature | Implementation |
|---|---|
| Schema Validation | Pydantic v2 (record-level) + Pandera (DataFrame-level) |
| Deduplication | Exact (SHA-256 hash) + Near-dedup (MinHash LSH) |
| Quality Filtering | Gopher/FineWeb-style heuristics (length, score, repetition) |
| Retry Logic | Tenacity with exponential backoff on I/O operations |
| Structured Logging | JSON-formatted logs for production monitoring |
| Metrics & Alerts | Per-stage metrics with threshold-based alerting |
| Checkpointing | Resume from last successful stage on failure |
| Multiple Outputs | Parquet, JSONL, or direct HF Hub push |
| 12-Factor Config | All settings via environment variables |
Quick Start
Installation
pip install pydantic pydantic-settings pandera tenacity pandas pyarrow datasets huggingface_hub
# Optional for near-dedup:
pip install datasketch
Run Demo
# Run with generated sample data
python -m data_pipeline.main --demo --demo-size 200
# Run with custom data
python -m data_pipeline.main --source data/raw/ --output data/processed/
# Push to HF Hub
python -m data_pipeline.main --source data/raw/ --output-format hf_hub --hf-repo myorg/clean-dataset
Environment Configuration
export PIPELINE_SOURCE_PATH=data/raw/
export PIPELINE_OUTPUT_PATH=data/processed/
export PIPELINE_OUTPUT_FORMAT=parquet
export PIPELINE_MIN_TEXT_LENGTH=50
export PIPELINE_MIN_QUALITY_SCORE=0.5
export PIPELINE_DEDUP_ENABLED=true
export PIPELINE_LOG_LEVEL=INFO
Usage in Code
from data_pipeline import PipelineConfig, PipelineRunner
import pandas as pd
# Configure
config = PipelineConfig(
source_path="data/raw/",
output_path="data/processed/",
output_format="parquet",
min_quality_score=0.6,
dedup_enabled=True,
)
# Run
runner = PipelineRunner(config)
output_data, metrics = runner.run()
# Check results
print(f"Status: {metrics.status}")
print(f"Records: {metrics.total_records_in} β {metrics.total_records_out}")
print(f"Duration: {metrics.total_duration_seconds:.1f}s")
Custom Stages
from data_pipeline.stages import PipelineStage
from data_pipeline.config import PipelineConfig
import pandas as pd
class MyCustomStage(PipelineStage):
def __init__(self, config: PipelineConfig):
super().__init__("my_custom_stage", config)
def process(self, data: pd.DataFrame) -> pd.DataFrame:
# Your logic here
data["new_column"] = data["text"].str.len()
return data
# Add to pipeline
runner = PipelineRunner(config)
runner.stages.insert(3, MyCustomStage(config)) # Insert after transform
output, metrics = runner.run(input_data=df)
Pipeline Stages
1. Ingest
- Loads data from JSONL, CSV, Parquet, or HF Hub
- Retry logic on I/O failures (3 attempts, exponential backoff)
- Supports directory scanning with glob patterns
2. Validate
- Per-record validation via Pydantic models
- Control character removal, whitespace stripping
- Quality gate: rejects short + low-quality records
- Invalid records logged and dropped (soft failure)
3. Transform
- Unicode normalization (NFC)
- Whitespace cleanup
- Computes:
text_length,word_count,content_hash - Fills missing quality scores with neutral default
4. Deduplicate
- Exact dedup: SHA-256 hash β O(n), catches identical documents
- Near-dedup: MinHash LSH β catches paraphrases/minor edits
- Configurable threshold (default: 0.8 Jaccard)
- 5-gram shingling (proven by FineWeb2)
5. Quality Filter
- Minimum/maximum text length
- Quality score threshold
- Word count minimum
- Repetition ratio (duplicate line detection)
- Per-filter impact logging
6. Load
- Parquet: Snappy-compressed, columnar (best for analytics)
- JSONL: Human-readable, streaming-friendly
- HF Hub: Direct push with retry logic
Monitoring
Structured Logs
{
"timestamp": "2024-01-15T10:30:00Z",
"level": "INFO",
"logger": "pipeline.transform",
"message": "Stage complete",
"metrics": {
"stage_name": "transform",
"records_in": 1000,
"records_out": 1000,
"pass_rate": 1.0,
"duration_seconds": 2.3
}
}
Alerts
Automatic alerts when:
- Failure rate > 20% (configurable)
- Pass rate < 50%
- Stage takes > 5 minutes
Run Metrics
Full JSON metrics report saved per run at data/metrics/run_<id>.json.
Testing
pip install pytest
pytest tests/ -v
Production Deployment Tips
- Use environment variables for all configuration (12-factor)
- Monitor the JSON logs with your log aggregator (ELK, CloudWatch, etc.)
- Set up alerts on failure_rate and duration thresholds
- Enable checkpointing for long-running pipelines (automatic)
- Scale dedup with
datasketchdisk storage for >10M records - Schedule with cron, Airflow, or Prefect for recurring ingestion
License
MIT
Generated by ML Intern
This model repository was generated by ML Intern, an agent for machine learning research and development on the Hugging Face Hub.
- Try ML Intern: https://smolagents-ml-intern.hf.space
- Source code: https://github.com/huggingface/ml-intern
Inference Providers NEW
This model isn't deployed by any Inference Provider. π Ask for provider support