Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| import argparse | |
| import json | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| from typing import Dict, Any, List | |
| from conv_data_gen.config import config | |
| from conv_data_gen.steps import STEP_EXECUTORS | |
| from conv_data_gen.runner import run_step_manifest | |
| # ----------------------------- CONFIG SECTION ----------------------------- | |
| RUN_DATE: datetime = datetime.now() - timedelta(days=1) | |
| # Get configuration from centralized config | |
| RUN_ID: str = config.generation.RUN_ID | |
| THEME: str = config.generation.THEME | |
| GLOBAL_PARAMS: Dict[str, Any] = { | |
| "max_companies": 5 or config.generation.MAX_COMPANIES, | |
| "per_company_max": 5 or config.generation.PER_COMPANY_MAX, | |
| "seed": 142857 or config.generation.SEED, | |
| "similarity_threshold": 0.90 or config.generation.SIMILARITY_THRESHOLD, | |
| "similarity_threshold_use_case": 0.80 | |
| or config.generation.SIMILARITY_THRESHOLD_USE_CASE, # noqa: E501 | |
| "embedding_model": "gemini-embedding-001" | |
| or config.generation.EMBEDDING_MODEL, | |
| "batch_size": 64 or config.generation.BATCH_SIZE, | |
| "num_personas": 5 or config.generation.NUM_PERSONAS, | |
| "max_randomizer_usage": 5, | |
| } | |
| STEPS_ORDER: List[str] = config.generation.STEPS_ORDER | |
| SELECTED_STEPS: List[str] = STEPS_ORDER | |
| MODE: str = config.generation.MODE | |
| DEFAULT_SINGLE_STEP: str = config.generation.DEFAULT_SINGLE_STEP | |
| # --------------------------- END CONFIG SECTION --------------------------- | |
| def _run_root() -> Path: | |
| d = RUN_DATE | |
| return ( | |
| config.paths.RUNS_DIR | |
| / f"{d.year:04d}" | |
| / f"{d.month:02d}" | |
| / f"{d.day:02d}" | |
| / THEME | |
| / RUN_ID | |
| ) | |
| def _write_manifest(step_dir: Path, step: str, params: Dict[str, Any]) -> None: | |
| man = {"run_id": RUN_ID, "step": step, "params": params or {}} | |
| step_dir.mkdir(parents=True, exist_ok=True) | |
| (step_dir / "manifest.json").write_text( | |
| json.dumps(man, ensure_ascii=False, indent=2), encoding="utf-8" | |
| ) | |
| def _prepare_run_root(run_root: Path, steps: List[str]) -> None: | |
| for s in steps: | |
| step_dir = run_root / s | |
| _write_manifest(step_dir, s, GLOBAL_PARAMS) | |
| def _execute_all(run_root: Path, steps: List[str]) -> None: | |
| for s in steps: | |
| exec_fn = STEP_EXECUTORS.get(s) | |
| if not exec_fn: | |
| print(f"[skip] No executor for {s}") | |
| continue | |
| step_dir = run_root / s | |
| if not (step_dir / "manifest.json").exists(): | |
| print(f"[skip] Missing manifest for {s}: {step_dir}") | |
| continue | |
| print(f"[run] {s}") | |
| run_step_manifest(step_dir, exec_fn) | |
| def _execute_single(run_root: Path, step: str) -> None: | |
| alias = step.strip() | |
| resolved = step | |
| if alias.isdigit(): | |
| idx = int(alias) | |
| key = f"{idx:02d}" | |
| matches = [s for s in STEPS_ORDER if s.startswith(f"{key}-")] | |
| if not matches: | |
| raise RuntimeError(f"Unknown numeric step: {alias}") | |
| resolved = matches[0] | |
| exec_fn = STEP_EXECUTORS.get(resolved) | |
| if not exec_fn: | |
| raise RuntimeError(f"Unknown step: {resolved}") | |
| step_dir = run_root / resolved | |
| if not (step_dir / "manifest.json").exists(): | |
| _write_manifest(step_dir, resolved, GLOBAL_PARAMS) | |
| print(f"[run] {resolved}") | |
| run_step_manifest(step_dir, exec_fn) | |
| def _print_help() -> None: | |
| """Print help information including all available steps""" | |
| print("Data Generation Pipeline") | |
| print("=" * 50) | |
| print() | |
| print("Available Steps:") | |
| print("-" * 20) | |
| for i, step in enumerate(STEPS_ORDER, 1): | |
| step_num = f"{i:02d}" | |
| step_name = step.replace(f"{step_num}-", "") | |
| step_description = step_name.replace("-", " ").title() | |
| # Check if step has an executor | |
| has_executor = step in STEP_EXECUTORS | |
| status = "✓" if has_executor else "✗" | |
| print(f" {step_num:>2}. {step:<20} {status} {step_description}") | |
| print() | |
| print("Usage:") | |
| print(" python run_pipeline.py --mode all # Run all steps") | |
| print(" python run_pipeline.py --mode single --step 02 # Run step 2") | |
| print(" python run_pipeline.py --mode single --step 02-usecases") | |
| print(" python run_pipeline.py --list # Show this help") | |
| print() | |
| print("Legend: ✓ = Available, ✗ = No executor") | |
| def main() -> None: | |
| parser = argparse.ArgumentParser( | |
| description="Data Generation Pipeline Runner", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| python run_pipeline.py --mode all # Run all steps | |
| python run_pipeline.py --mode single --step 02 # Run step 2 (usecases) | |
| python run_pipeline.py --mode single --step 02-usecases # Run by name | |
| python run_pipeline.py --list # Show all available steps | |
| """.strip(), | |
| ) | |
| parser.add_argument( | |
| "--mode", | |
| choices=["single", "all"], | |
| default=MODE, | |
| help="Run all steps or a single step", | |
| ) | |
| parser.add_argument( | |
| "--step", | |
| type=str, | |
| default=DEFAULT_SINGLE_STEP, | |
| help="Step number or name (used if --mode=single)", | |
| ) | |
| parser.add_argument( | |
| "--list", | |
| action="store_true", | |
| help="Show all available steps and exit", | |
| ) | |
| args = parser.parse_args() | |
| # Check if --list was passed | |
| if args.list: | |
| _print_help() | |
| return | |
| print(f"=========RUNNNING MODE: {args.mode}, STEP: {args.step}==========") | |
| run_root = _run_root() | |
| run_root.mkdir(parents=True, exist_ok=True) | |
| _prepare_run_root(run_root, SELECTED_STEPS) | |
| if args.mode == "all": | |
| _execute_all(run_root, SELECTED_STEPS) | |
| elif args.mode == "single": | |
| _execute_single(run_root, args.step) | |
| else: | |
| raise RuntimeError(f"Unknown MODE: {args.mode}") | |
| if __name__ == "__main__": | |
| main() | |