data-gen / run_pipeline.py
ashish-sarvam's picture
Upload folder using huggingface_hub
fc1a684 verified
from __future__ import annotations
import argparse
import json
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, Any, List
from conv_data_gen.config import config
from conv_data_gen.steps import STEP_EXECUTORS
from conv_data_gen.runner import run_step_manifest
# ----------------------------- CONFIG SECTION -----------------------------
RUN_DATE: datetime = datetime.now() - timedelta(days=1)
# Get configuration from centralized config
RUN_ID: str = config.generation.RUN_ID
THEME: str = config.generation.THEME
GLOBAL_PARAMS: Dict[str, Any] = {
"max_companies": 5 or config.generation.MAX_COMPANIES,
"per_company_max": 5 or config.generation.PER_COMPANY_MAX,
"seed": 142857 or config.generation.SEED,
"similarity_threshold": 0.90 or config.generation.SIMILARITY_THRESHOLD,
"similarity_threshold_use_case": 0.80
or config.generation.SIMILARITY_THRESHOLD_USE_CASE, # noqa: E501
"embedding_model": "gemini-embedding-001"
or config.generation.EMBEDDING_MODEL,
"batch_size": 64 or config.generation.BATCH_SIZE,
"num_personas": 5 or config.generation.NUM_PERSONAS,
"max_randomizer_usage": 5,
}
STEPS_ORDER: List[str] = config.generation.STEPS_ORDER
SELECTED_STEPS: List[str] = STEPS_ORDER
MODE: str = config.generation.MODE
DEFAULT_SINGLE_STEP: str = config.generation.DEFAULT_SINGLE_STEP
# --------------------------- END CONFIG SECTION ---------------------------
def _run_root() -> Path:
d = RUN_DATE
return (
config.paths.RUNS_DIR
/ f"{d.year:04d}"
/ f"{d.month:02d}"
/ f"{d.day:02d}"
/ THEME
/ RUN_ID
)
def _write_manifest(step_dir: Path, step: str, params: Dict[str, Any]) -> None:
man = {"run_id": RUN_ID, "step": step, "params": params or {}}
step_dir.mkdir(parents=True, exist_ok=True)
(step_dir / "manifest.json").write_text(
json.dumps(man, ensure_ascii=False, indent=2), encoding="utf-8"
)
def _prepare_run_root(run_root: Path, steps: List[str]) -> None:
for s in steps:
step_dir = run_root / s
_write_manifest(step_dir, s, GLOBAL_PARAMS)
def _execute_all(run_root: Path, steps: List[str]) -> None:
for s in steps:
exec_fn = STEP_EXECUTORS.get(s)
if not exec_fn:
print(f"[skip] No executor for {s}")
continue
step_dir = run_root / s
if not (step_dir / "manifest.json").exists():
print(f"[skip] Missing manifest for {s}: {step_dir}")
continue
print(f"[run] {s}")
run_step_manifest(step_dir, exec_fn)
def _execute_single(run_root: Path, step: str) -> None:
alias = step.strip()
resolved = step
if alias.isdigit():
idx = int(alias)
key = f"{idx:02d}"
matches = [s for s in STEPS_ORDER if s.startswith(f"{key}-")]
if not matches:
raise RuntimeError(f"Unknown numeric step: {alias}")
resolved = matches[0]
exec_fn = STEP_EXECUTORS.get(resolved)
if not exec_fn:
raise RuntimeError(f"Unknown step: {resolved}")
step_dir = run_root / resolved
if not (step_dir / "manifest.json").exists():
_write_manifest(step_dir, resolved, GLOBAL_PARAMS)
print(f"[run] {resolved}")
run_step_manifest(step_dir, exec_fn)
def _print_help() -> None:
"""Print help information including all available steps"""
print("Data Generation Pipeline")
print("=" * 50)
print()
print("Available Steps:")
print("-" * 20)
for i, step in enumerate(STEPS_ORDER, 1):
step_num = f"{i:02d}"
step_name = step.replace(f"{step_num}-", "")
step_description = step_name.replace("-", " ").title()
# Check if step has an executor
has_executor = step in STEP_EXECUTORS
status = "✓" if has_executor else "✗"
print(f" {step_num:>2}. {step:<20} {status} {step_description}")
print()
print("Usage:")
print(" python run_pipeline.py --mode all # Run all steps")
print(" python run_pipeline.py --mode single --step 02 # Run step 2")
print(" python run_pipeline.py --mode single --step 02-usecases")
print(" python run_pipeline.py --list # Show this help")
print()
print("Legend: ✓ = Available, ✗ = No executor")
def main() -> None:
parser = argparse.ArgumentParser(
description="Data Generation Pipeline Runner",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python run_pipeline.py --mode all # Run all steps
python run_pipeline.py --mode single --step 02 # Run step 2 (usecases)
python run_pipeline.py --mode single --step 02-usecases # Run by name
python run_pipeline.py --list # Show all available steps
""".strip(),
)
parser.add_argument(
"--mode",
choices=["single", "all"],
default=MODE,
help="Run all steps or a single step",
)
parser.add_argument(
"--step",
type=str,
default=DEFAULT_SINGLE_STEP,
help="Step number or name (used if --mode=single)",
)
parser.add_argument(
"--list",
action="store_true",
help="Show all available steps and exit",
)
args = parser.parse_args()
# Check if --list was passed
if args.list:
_print_help()
return
print(f"=========RUNNNING MODE: {args.mode}, STEP: {args.step}==========")
run_root = _run_root()
run_root.mkdir(parents=True, exist_ok=True)
_prepare_run_root(run_root, SELECTED_STEPS)
if args.mode == "all":
_execute_all(run_root, SELECTED_STEPS)
elif args.mode == "single":
_execute_single(run_root, args.step)
else:
raise RuntimeError(f"Unknown MODE: {args.mode}")
if __name__ == "__main__":
main()