LaelaZ's picture
Deploy CommercePipeline to HF Spaces (Docker)
16decd8 verified
"""Command-line entry point for the pipeline.
Examples
--------
python -m pipeline run # full pipeline ingest -> load -> transform -> quality
python -m pipeline ingest # just regenerate raw data
python -m pipeline transform # rebuild marts from the existing warehouse
python -m pipeline quality # re-run the data-quality gate
"""
from __future__ import annotations
import argparse
import logging
import sys
from pipeline import ingest, load, quality, transform
from pipeline.config import get_settings
from pipeline.flow import run_pipeline
from pipeline.load import connect
def _configure_logging(verbose: bool) -> None:
logging.basicConfig(
level=logging.INFO if verbose else logging.WARNING,
format="%(message)s",
stream=sys.stderr,
)
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(prog="commerce-pipeline", description=__doc__)
parser.add_argument(
"stage",
choices=["run", "ingest", "load", "transform", "quality"],
help="pipeline stage to execute ('run' = full pipeline)",
)
parser.add_argument("-q", "--quiet", action="store_true", help="suppress progress logs")
args = parser.parse_args(argv)
_configure_logging(verbose=not args.quiet)
s = get_settings()
try:
if args.stage == "run":
summary = run_pipeline(s)
print(summary.render())
return 0
if args.stage == "ingest":
counts = ingest.run(s)
print(f"ingest complete: {sum(counts.values()):,} rows across {len(counts)} tables")
return 0
con = connect(s)
try:
if args.stage == "load":
counts = load.run(con, s)
print(f"load complete: {counts}")
elif args.stage == "transform":
marts = transform.run(con, s)
print(f"transform complete: built marts {marts}")
elif args.stage == "quality":
results = quality.run(con, s)
print(f"quality complete: {len(results)} checks passed")
finally:
con.close()
return 0
except quality.DataQualityError as exc:
print(f"DATA QUALITY GATE FAILED: {exc}", file=sys.stderr)
return 1
except FileNotFoundError as exc:
print(f"ERROR: {exc}", file=sys.stderr)
return 2
if __name__ == "__main__": # pragma: no cover
raise SystemExit(main())