Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| import csv | |
| import gzip | |
| import json | |
| from pathlib import Path | |
| from typing import Any, Dict, List | |
| import pandas as pd | |
| from .config import config | |
| from .logger import setup_logger | |
| from .generators import ( | |
| ToolGenerator, | |
| BotGenerator, | |
| UserPersonaGenerator, | |
| ) | |
| from .generators.user_structured.user_card_generator import UserCardGenerator | |
| from .generators.enrichment.generator import generate_factsheets_from_csv | |
| from .generators.structured_use_case.plan_generator import ( | |
| generate_company_plans_from_factsheets, | |
| ) | |
| from .generators.structured_use_case.narrative_generator import ( | |
| generate_usecases_from_company_plans, | |
| flatten_use_cases, | |
| ) | |
| from .generators.conversation.jsonl_pipeline import ( | |
| run_conversations_from_artifacts, | |
| ) | |
| from .generators.checks.checker import run_checks | |
| from .generators.fine_tuning.generator import FineTuningDataGenerator | |
| from .generators.manipulations.manipulation_generator import ( | |
| apply_manipulations_to_conversations, | |
| ) | |
| from .dedup.user_card_dedup_jsonl import dedup_user_cards_artifact | |
| from .dedup.use_case_dedup import UseCaseEmbeddingsDeduper | |
| logger = setup_logger(__name__) | |
| STEP_EXECUTORS = {} | |
| def _write_jsonl(path: Path, rows: List[Dict[str, Any]]) -> str: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| with open(path, "w", encoding="utf-8") as f: | |
| for r in rows: | |
| f.write(json.dumps(r, ensure_ascii=False) + "\n") | |
| return str(path) | |
| def _resolve_path(path_str: str) -> Path: | |
| p = Path(path_str).expanduser() | |
| if not p.is_absolute(): | |
| p = (config.paths.BASE_DIR / p).resolve() | |
| return p | |
| def _read_jsonl(path: Path) -> List[Dict[str, Any]]: | |
| rows: List[Dict[str, Any]] = [] | |
| sufs = path.suffixes | |
| is_gz = len(sufs) >= 2 and sufs[-2:] == [".jsonl", ".gz"] | |
| if is_gz: | |
| def f_open(): # noqa: D401 | |
| return gzip.open(path, "rt", encoding="utf-8") | |
| else: | |
| def f_open(): # noqa: D401 | |
| return open(path, "r", encoding="utf-8") | |
| with f_open() as f: | |
| for line in f: | |
| try: | |
| rows.append(json.loads(line)) | |
| except Exception: | |
| continue | |
| return rows | |
| def _coerce_to_list(value: Any) -> List[Any]: | |
| if isinstance(value, list): | |
| return value | |
| if isinstance(value, str): | |
| text = value.strip() | |
| if not text: | |
| return [] | |
| try: | |
| parsed = json.loads(text) | |
| except Exception: | |
| parsed = None | |
| if isinstance(parsed, list): | |
| return parsed | |
| for sep in (";", "|", ","): | |
| if sep in text: | |
| parts = [seg.strip() for seg in text.split(sep) if seg.strip()] | |
| if parts: | |
| return parts | |
| return [text] | |
| if value is None: | |
| return [] | |
| return [value] | |
| def _load_template(path: Path): | |
| # kept for backward compatibility if needed elsewhere; not used here now | |
| with open(path, "r", encoding="utf-8") as f: | |
| return f.read() | |
| def execute_step_01_enrichment( | |
| step_dir: Path, manifest: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """Generate enriched company factsheets for downstream use-case generation.""" # noqa | |
| params = manifest.get("params", {}) | |
| default_input_csv = ( | |
| config.paths.GENERATORS_DIR | |
| / "enrichment" | |
| / "companies_structured_mini.csv" | |
| ) | |
| default_template = ( | |
| config.paths.GENERATORS_DIR / "enrichment" / "prompts" / "prompt.j2" | |
| ) | |
| input_csv_param = params.get("enrichment_input_csv") | |
| template_param = params.get("enrichment_template_path") | |
| max_workers_param = params.get("enrichment_max_workers") | |
| input_csv = ( | |
| _resolve_path(str(input_csv_param)) | |
| if input_csv_param | |
| else default_input_csv | |
| ) | |
| template_path = ( | |
| _resolve_path(str(template_param)) | |
| if template_param | |
| else default_template | |
| ) | |
| if not Path(input_csv).exists(): | |
| raise FileNotFoundError(f"Enrichment input CSV not found: {input_csv}") | |
| if not Path(template_path).exists(): | |
| raise FileNotFoundError( | |
| f"Enrichment template not found: {template_path}" | |
| ) | |
| configured_workers = ( | |
| int(max_workers_param) | |
| if isinstance(max_workers_param, (int, str)) | |
| and str(max_workers_param).isdigit() | |
| else None | |
| ) | |
| fallback_workers = ( | |
| config.concurrency.USE_CASES_MAX_WORKERS | |
| if config.concurrency.USE_CASES_MAX_WORKERS | |
| else config.concurrency.DEFAULT_MAX_WORKERS | |
| ) | |
| max_workers = max(1, configured_workers or fallback_workers) | |
| logger.info( | |
| "[01-enrichment] input=%s template=%s workers=%d", | |
| str(input_csv), | |
| str(template_path), | |
| max_workers, | |
| ) | |
| factsheets = generate_factsheets_from_csv( | |
| input_csv=str(input_csv), | |
| template_path=Path(template_path), | |
| max_workers=max_workers, | |
| ) | |
| out_jsonl = step_dir / "company_factsheets.jsonl" | |
| _write_jsonl(out_jsonl, factsheets) | |
| return { | |
| "status": "success", | |
| "outputs": [{"name": "company_factsheets", "uri": str(out_jsonl)}], | |
| "metrics": {"factsheets": len(factsheets)}, | |
| } | |
| def _jsonl_to_temp_csv(jsonl_path: Path, temp_csv: Path) -> str: | |
| temp_csv.parent.mkdir(parents=True, exist_ok=True) | |
| rows: List[Dict[str, Any]] = [] | |
| sufs = jsonl_path.suffixes | |
| is_gz = len(sufs) >= 2 and sufs[-2:] == [".jsonl", ".gz"] | |
| if is_gz: | |
| def f_open(): # noqa: D401 | |
| return gzip.open(jsonl_path, "rt", encoding="utf-8") | |
| else: | |
| def f_open(): # noqa: D401 | |
| return open(jsonl_path, "r", encoding="utf-8") | |
| with f_open() as f: | |
| for line in f: | |
| try: | |
| rows.append(json.loads(line)) | |
| except Exception: | |
| continue | |
| if not rows: | |
| raise RuntimeError("Empty JSONL input") | |
| with open(temp_csv, "w", encoding="utf-8", newline="") as out: | |
| headers = list(rows[0].keys()) | |
| w = csv.DictWriter(out, fieldnames=headers) | |
| w.writeheader() | |
| for r in rows: | |
| w.writerow({k: r.get(k, "") for k in headers}) | |
| return str(temp_csv) | |
| def execute_step_02_usecase_planning( | |
| step_dir: Path, manifest: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """Generate plans per company from enriched factsheets (no narratives).""" | |
| prev_root = step_dir.parent | |
| factsheets_uri = _read_result_output( | |
| prev_root, "01-enrichment", "company_factsheets" | |
| ) | |
| factsheets_path = Path(factsheets_uri) | |
| if not factsheets_path.exists(): | |
| raise FileNotFoundError( | |
| f"Factsheets artifact not found: {factsheets_path}" | |
| ) | |
| # Prefer JSONL; support JSON fallback | |
| if factsheets_path.suffixes and factsheets_path.suffixes[-1] == ".jsonl": | |
| factsheets_data: List[Dict[str, Any]] = _read_jsonl(factsheets_path) | |
| else: | |
| try: | |
| loaded = json.loads(factsheets_path.read_text(encoding="utf-8")) | |
| except json.JSONDecodeError as exc: | |
| raise RuntimeError( | |
| f"Invalid JSON factsheets at {factsheets_path}" | |
| ) from exc | |
| if not isinstance(loaded, list): | |
| raise RuntimeError("Factsheets artifact must be a list of objects") | |
| factsheets_data = loaded | |
| params = manifest.get("params", {}) | |
| plan_tpl_path = config.paths.PLAN_PROMPT | |
| plan_tpl_override = params.get("plan_template_path") | |
| if plan_tpl_override: | |
| plan_tpl_path = _resolve_path(str(plan_tpl_override)) | |
| if not Path(plan_tpl_path).exists(): | |
| raise FileNotFoundError(f"Plan template not found: {plan_tpl_path}") | |
| max_workers_param = params.get("structured_usecase_max_workers") | |
| configured_workers = ( | |
| int(max_workers_param) | |
| if isinstance(max_workers_param, (int, str)) | |
| and str(max_workers_param).isdigit() | |
| else None | |
| ) | |
| fallback_workers = ( | |
| config.concurrency.USE_CASES_MAX_WORKERS | |
| if config.concurrency.USE_CASES_MAX_WORKERS | |
| else config.concurrency.DEFAULT_MAX_WORKERS | |
| ) | |
| max_workers = max(1, configured_workers or fallback_workers) | |
| logger.info( | |
| "[02-usecase-planning] template=%s workers=%d", | |
| str(plan_tpl_path), | |
| max_workers, | |
| ) | |
| company_plans: List[Dict[str, Any]] = ( | |
| generate_company_plans_from_factsheets( | |
| factsheets=factsheets_data, | |
| template_path=Path(plan_tpl_path), | |
| max_workers=max_workers, | |
| ) | |
| ) | |
| # Optional flat JSONL for inspection | |
| plan_rows: List[Dict[str, Any]] = [] | |
| for pkg in company_plans: | |
| company_name = pkg.get("company", "") | |
| for p in pkg.get("plans", []) or []: | |
| plan_rows.append( | |
| { | |
| "company": company_name, | |
| "plan_id": p.get("plan_id", ""), | |
| "user_type": p.get("user_type", ""), | |
| "agent_type": p.get("agent_type", ""), | |
| "conversation_direction": p.get( | |
| "conversation_direction", "" | |
| ), | |
| "trigger": p.get("trigger", ""), | |
| } | |
| ) | |
| agg_jsonl = step_dir / "company_plans.jsonl" | |
| _write_jsonl(agg_jsonl, company_plans) | |
| out_jsonl = step_dir / "plans.jsonl" | |
| _write_jsonl(out_jsonl, plan_rows) | |
| return { | |
| "status": "success", | |
| "outputs": [ | |
| {"name": "company_plans", "uri": str(agg_jsonl)}, | |
| {"name": "plans", "uri": str(out_jsonl)}, | |
| ], | |
| "metrics": { | |
| "companies": len(company_plans), | |
| "plans": len(plan_rows), | |
| }, | |
| } | |
| def execute_step_03_usecases( | |
| step_dir: Path, manifest: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """Expand plans into narratives and emit structured use-cases artifacts.""" | |
| prev_root = step_dir.parent | |
| plans_uri = _read_result_output( | |
| prev_root, "02-usecase-planning", "company_plans" | |
| ) | |
| plans_path = Path(plans_uri) | |
| if not plans_path.exists(): | |
| raise FileNotFoundError(f"Company plans not found: {plans_path}") | |
| company_plans: List[Dict[str, Any]] = _read_jsonl(plans_path) | |
| params = manifest.get("params", {}) | |
| nar_tpl_path = config.paths.NARRATIVE_PROMPT | |
| nar_tpl_override = params.get("narrative_template_path") | |
| if nar_tpl_override: | |
| nar_tpl_path = _resolve_path(str(nar_tpl_override)) | |
| if not Path(nar_tpl_path).exists(): | |
| raise FileNotFoundError( | |
| f"Narrative template not found: {nar_tpl_path}" | |
| ) | |
| max_workers_param = params.get("structured_usecase_max_workers") | |
| configured_workers = ( | |
| int(max_workers_param) | |
| if isinstance(max_workers_param, (int, str)) | |
| and str(max_workers_param).isdigit() | |
| else None | |
| ) | |
| fallback_workers = ( | |
| config.concurrency.USE_CASES_MAX_WORKERS | |
| if config.concurrency.USE_CASES_MAX_WORKERS | |
| else config.concurrency.DEFAULT_MAX_WORKERS | |
| ) | |
| max_workers = max(1, configured_workers or fallback_workers) | |
| logger.info( | |
| "[03-usecases] narrative template=%s workers=%d", | |
| str(nar_tpl_path), | |
| max_workers, | |
| ) | |
| results: List[Dict[str, Any]] = generate_usecases_from_company_plans( | |
| company_plans=company_plans, | |
| narrative_template_path=Path(nar_tpl_path), | |
| max_workers=max_workers, | |
| ) | |
| rows = flatten_use_cases(results) | |
| out_jsonl = step_dir / "structured_usecases.jsonl" | |
| _write_jsonl(out_jsonl, rows) | |
| return { | |
| "status": "success", | |
| "outputs": [ | |
| {"name": "structured_usecases", "uri": str(out_jsonl)}, | |
| ], | |
| "metrics": {"companies": len(results), "usecases_rows": len(rows)}, | |
| } | |
| def execute_step_04_dedup_usecases( | |
| step_dir: Path, manifest: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """Dedup use-cases via embeddings; emit JSONL artifact + result.json.""" | |
| # Expect prev output at ../03-usecases/result.json | |
| prev_dir = step_dir.parent / "03-usecases" | |
| prev_result = prev_dir / "result.json" | |
| if not prev_result.exists(): | |
| raise FileNotFoundError("Previous step result.json not found") | |
| res = json.loads(prev_result.read_text(encoding="utf-8")) | |
| out_uri = "" | |
| for o in res.get("outputs", []): | |
| if o.get("name") == "structured_usecases": | |
| out_uri = str(o.get("uri") or "") | |
| break | |
| if not out_uri: | |
| raise RuntimeError( | |
| "structured_usecases output not found in previous step" | |
| ) | |
| # Convert JSONL → CSV for existing deduper | |
| input_csv_path: str | |
| src = Path(out_uri) | |
| sufs = src.suffixes | |
| if (sufs and sufs[-1] == ".jsonl") or ( | |
| len(sufs) >= 2 and sufs[-2:] == [".jsonl", ".gz"] | |
| ): | |
| input_csv_path = _jsonl_to_temp_csv( | |
| src, step_dir / "_usecases_input.csv" | |
| ) | |
| else: | |
| input_csv_path = str(src) | |
| # Read params | |
| params = manifest.get("params", {}) | |
| embedding_model = str( | |
| params.get("embedding_model", "gemini-embedding-001") | |
| ) | |
| batch_size = int(params.get("batch_size", 64)) | |
| threshold = float(params.get("similarity_threshold_use_case")) | |
| assert threshold is not None, "similarity_threshold_use_case is required" | |
| deduper = UseCaseEmbeddingsDeduper( | |
| project_id=config.gcp.PROJECT_ID, | |
| location=config.gcp.LOCATION, | |
| model_name=embedding_model, | |
| batch_size=batch_size, | |
| ) | |
| dedup_res = deduper.run( | |
| input_csv=input_csv_path, | |
| output_dir=str(step_dir), | |
| threshold=threshold, | |
| ) | |
| # Convert deduped CSV → JSONL GZ | |
| df = pd.read_csv(dedup_res.deduped_csv_path) | |
| list_cols = [ | |
| "kpi", | |
| "conversation_stages", | |
| "pain_points", | |
| "lines_of_business", | |
| "processes", | |
| "compliance_and_policies", | |
| "metrics", | |
| ] | |
| for col in list_cols: | |
| if col in df.columns: | |
| df[col] = df[col].apply(_coerce_to_list) | |
| recs = df.fillna("").to_dict(orient="records") | |
| # type: ignore[no-untyped-call] | |
| rows = [{str(k): v for k, v in r.items()} for r in recs] | |
| out_jsonl = step_dir / "usecases_dedup.jsonl" | |
| _write_jsonl(out_jsonl, rows) | |
| report_path = step_dir / "dedup_report.json" | |
| return { | |
| "status": "success", | |
| "outputs": [ | |
| {"name": "usecases_dedup", "uri": str(out_jsonl)}, | |
| {"name": "dedup_report", "uri": str(report_path)}, | |
| ], | |
| "metrics": { | |
| "input_count": dedup_res.input_count, | |
| "kept_count": dedup_res.kept_count, | |
| "removed_count": dedup_res.removed_count, | |
| "avg_nearest_similarity": dedup_res.avg_nearest_similarity, | |
| }, | |
| } | |
| def _read_result_output(step_root: Path, step_name: str, output: str) -> str: | |
| res_path = step_root / step_name / "result.json" | |
| if not res_path.exists(): | |
| raise FileNotFoundError(f"Missing result.json in {step_name}") | |
| data = json.loads(res_path.read_text(encoding="utf-8")) | |
| for o in data.get("outputs", []): | |
| if o.get("name") == output: | |
| return str(o.get("uri") or "") | |
| raise RuntimeError(f"Output {output} not found in {step_name}") | |
| def execute_step_05_tools( | |
| step_dir: Path, manifest: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| prev_root = step_dir.parent | |
| usecases_dedup_uri = _read_result_output( | |
| prev_root, "04-dedup-usecases", "usecases_dedup" | |
| ) | |
| logger.info( | |
| "[05-tools] usecases_dedup uri: %s", | |
| usecases_dedup_uri, | |
| ) | |
| out_jsonl = step_dir / "usecase_tools_map.jsonl" | |
| params = manifest.get("params", {}) | |
| # Parse parameters for company-based filtering | |
| max_use_cases_per_company = int(params.get("per_company_max", 0) or 0) | |
| max_companies = int(params.get("max_companies", 0) or 0) | |
| logger.info("[05-tools] max_companies: %s", max_companies) | |
| logger.info( | |
| "[05-tools] max_use_cases_per_company: %s", max_use_cases_per_company | |
| ) | |
| tools_rows = ToolGenerator.generate_tools_map_from_usecases_artifact( | |
| usecases_path=usecases_dedup_uri, | |
| output_jsonl_path=str(out_jsonl), | |
| max_companies=max_companies if max_companies > 0 else None, | |
| max_use_cases_per_company=( | |
| max_use_cases_per_company | |
| if max_use_cases_per_company > 0 | |
| else None | |
| ), | |
| ) | |
| logger.info( | |
| "[05-tools] wrote %d rows to %s", | |
| len(tools_rows), | |
| str(out_jsonl), | |
| ) | |
| return { | |
| "status": "success", | |
| "outputs": [{"name": "tools_map", "uri": str(out_jsonl)}], | |
| "metrics": {"tool_specs": len(tools_rows)}, | |
| } | |
| def execute_step_06_bots( | |
| step_dir: Path, manifest: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| prev_root = step_dir.parent | |
| tools_map_uri = _read_result_output(prev_root, "05-tools", "tools_map") | |
| logger.info("[06-bots] tools_map uri: %s", tools_map_uri) | |
| logger.info( | |
| "[06-bots] output will be written to: %s", | |
| str(step_dir / "bundles.jsonl"), | |
| ) | |
| out_jsonl = step_dir / "bundles.jsonl" | |
| bundles = BotGenerator.generate_bundles_from_tools_map( | |
| tools_map_path=tools_map_uri, | |
| output_jsonl_path=str(out_jsonl), | |
| output_csv_path=str(step_dir / "bundles.csv"), | |
| ) | |
| logger.info( | |
| "[06-bots] wrote %d rows to %s", | |
| len(bundles), | |
| str(out_jsonl), | |
| ) | |
| return { | |
| "status": "success", | |
| "outputs": [{"name": "bundles", "uri": str(out_jsonl)}], | |
| "metrics": {"bundles": len(bundles)}, | |
| } | |
| def execute_step_07_user_cards( | |
| step_dir: Path, manifest: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """Generate user cards with personalities and goals from bot bundles.""" | |
| prev_root = step_dir.parent | |
| bundles_uri = _read_result_output(prev_root, "06-bots", "bundles") | |
| logger.info("[07-user-cards] bundles uri: %s", bundles_uri) | |
| out_jsonl = step_dir / "user_cards.jsonl" | |
| logger.info( | |
| "[07-user-cards] output will be written to: %s", | |
| str(out_jsonl), | |
| ) | |
| user_cards = UserCardGenerator.generate_user_cards_from_bundles_artifact( | |
| bundles_path=bundles_uri, | |
| output_jsonl_path=str(out_jsonl), | |
| ) | |
| logger.info( | |
| "[07-user-cards] wrote %d user cards to %s", | |
| len(user_cards), | |
| str(out_jsonl), | |
| ) | |
| return { | |
| "status": "success", | |
| "outputs": [{"name": "proxies", "uri": str(out_jsonl)}], | |
| "metrics": {"user_cards": len(user_cards)}, | |
| } | |
| def execute_step_08_dedup_proxies( | |
| step_dir: Path, manifest: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """Deduplicate user cards based on conversation goals.""" | |
| prev_root = step_dir.parent | |
| proxies_uri = _read_result_output(prev_root, "07-proxies", "proxies") | |
| out_jsonl = step_dir / "bundle_proxy_map_dedup.jsonl" | |
| # Read params | |
| params = manifest.get("params", {}) | |
| similarity_threshold = float(params.get("similarity_threshold", 0.90)) | |
| embedding_model = str( | |
| params.get("embedding_model", "gemini-embedding-001") | |
| ) | |
| batch_size = int(params.get("batch_size", 64)) | |
| # Use user card deduplication based on conversation goals | |
| deduped, metrics = dedup_user_cards_artifact( | |
| user_cards_jsonl_path=proxies_uri, | |
| output_jsonl_path=str(out_jsonl), | |
| similarity_threshold=similarity_threshold, | |
| embedding_model=embedding_model, | |
| batch_size=batch_size, | |
| ) | |
| logger.info( | |
| "[08-dedup-proxies] Deduped user cards by conversation goals: " | |
| "%d kept (removed %d)", | |
| metrics.get("kept_count", 0), | |
| metrics.get("removed_count", 0), | |
| ) | |
| return { | |
| "status": "success", | |
| "outputs": [{"name": "proxies_dedup", "uri": str(out_jsonl)}], | |
| "metrics": metrics, | |
| } | |
| def execute_step_09_personas( | |
| step_dir: Path, manifest: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| params = manifest.get("params", {}) | |
| num_personas = int(params.get("num_personas", 2)) | |
| prev_root = step_dir.parent | |
| proxies_uri = _read_result_output( | |
| prev_root, "08-dedup-proxies", "proxies_dedup" | |
| ) | |
| out_jsonl = step_dir / "personas.jsonl" | |
| personas = UserPersonaGenerator.generate_personas_from_proxies_artifact( | |
| proxies_path=proxies_uri, | |
| output_jsonl_path=str(out_jsonl), | |
| num_personas=num_personas, | |
| ) | |
| return { | |
| "status": "success", | |
| "outputs": [{"name": "personas", "uri": str(out_jsonl)}], | |
| "metrics": {"personas": len(personas)}, | |
| } | |
| def execute_step_10_conversations( | |
| step_dir: Path, manifest: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| # JSONL-first: pair bundles with personas and simulate conversations | |
| prev_root = step_dir.parent | |
| bundles_uri = _read_result_output(prev_root, "06-bots", "bundles") | |
| personas_uri = _read_result_output(prev_root, "09-personas", "personas") | |
| conv_dir = step_dir / "conversations" | |
| params = manifest.get("params", {}) | |
| max_randomizer_usage_param = int(params.get("max_randomizer_usage")) | |
| metrics, _summaries = run_conversations_from_artifacts( | |
| bundles_uri=bundles_uri, | |
| personas_uri=personas_uri, | |
| output_dir=conv_dir, | |
| max_randomizer_usage=max_randomizer_usage_param, | |
| ) | |
| metrics_path = step_dir / "metrics.json" | |
| metrics_path.write_text( | |
| json.dumps(metrics, ensure_ascii=False, indent=2), encoding="utf-8" | |
| ) | |
| logger.info("[10-conv] Metrics: %s", json.dumps(metrics)) | |
| return { | |
| "status": "success", | |
| "outputs": [{"name": "metrics", "uri": str(metrics_path)}], | |
| "metrics": metrics, | |
| } | |
| def execute_step_11_manipulations( | |
| step_dir: Path, manifest: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """Apply manipulations to conversations like inserting random messages.""" | |
| prev_root = step_dir.parent | |
| conv_root = prev_root / "10-conv" / "conversations" | |
| output_conv_dir = step_dir / "conversations" | |
| logger.info("[11-manipulations] Starting conversation manipulations") | |
| # Get manipulation parameters from manifest | |
| params = manifest.get("params", {}) | |
| manipulation_types = params.get( | |
| "manipulation_types", | |
| ["random_message", "voice_translation", "memory_reference"], | |
| ) | |
| seed = params.get("seed", 42) | |
| # Apply manipulations using the manipulations module | |
| metrics = apply_manipulations_to_conversations( | |
| input_dir=conv_root, | |
| output_dir=output_conv_dir, | |
| manipulation_types=manipulation_types, | |
| seed=seed, | |
| ) | |
| logger.info( | |
| "[11-manipulations] Processed %d conversations", metrics["processed"] | |
| ) | |
| metrics["status"] = "success" | |
| metrics_path = step_dir / "metrics.json" | |
| metrics_path.write_text( | |
| json.dumps(metrics, ensure_ascii=False, indent=2), encoding="utf-8" | |
| ) | |
| return { | |
| "status": "success", | |
| "outputs": [ | |
| {"name": "conversations", "uri": str(output_conv_dir)}, | |
| {"name": "metrics", "uri": str(metrics_path)}, | |
| ], | |
| "metrics": metrics, | |
| } | |
| def execute_step_12_checks( | |
| step_dir: Path, manifest: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| prev_root = step_dir.parent | |
| bundles_uri = _read_result_output(prev_root, "06-bots", "bundles") | |
| personas_uri = _read_result_output(prev_root, "09-personas", "personas") | |
| conv_root = prev_root / "11-manipulations" / "conversations" | |
| out_jsonl = step_dir / "checks.jsonl" | |
| checks, metrics = run_checks( | |
| conversations_dir=conv_root, | |
| bundles_uri=bundles_uri, | |
| personas_uri=personas_uri, | |
| output_jsonl_path=out_jsonl, | |
| ) | |
| metrics_path = step_dir / "metrics.json" | |
| metrics_path.write_text( | |
| json.dumps(metrics, ensure_ascii=False, indent=2), encoding="utf-8" | |
| ) | |
| return { | |
| "status": "success", | |
| "outputs": [ | |
| {"name": "checks", "uri": str(out_jsonl)}, | |
| {"name": "metrics", "uri": str(metrics_path)}, | |
| ], | |
| "metrics": metrics, | |
| } | |
| def execute_step_13_fine_tuning_data( | |
| step_dir: Path, manifest: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """Convert conversation data to fine-tuning dataset format using step 11 checker results.""" # noqa: E501 | |
| prev_root = step_dir.parent | |
| conv_root = prev_root / "11-manipulations" / "conversations" | |
| # Get the checker results from step 11 | |
| checks_uri = _read_result_output(prev_root, "12-checks", "checks") | |
| logger.info( | |
| "[12-fine-tuning] Starting fine-tuning data generation using " | |
| "step 11 checker results" | |
| ) | |
| # Use the generator with step 11 checker data | |
| generator = FineTuningDataGenerator( | |
| conversations_dir=conv_root, | |
| checks_uri=checks_uri, | |
| ) | |
| fine_tuning_rows, metrics = generator.generate_fine_tuning_dataset() | |
| # Write fine-tuning dataset | |
| out_jsonl = step_dir / "fine_tuning_dataset.jsonl" | |
| _write_jsonl(out_jsonl, fine_tuning_rows) | |
| # Also create a CSV version for easier inspection | |
| out_csv = step_dir / "fine_tuning_dataset.csv" | |
| if fine_tuning_rows: | |
| df = pd.DataFrame(fine_tuning_rows) | |
| df.to_csv(out_csv, index=False, encoding="utf-8") | |
| metrics_path = step_dir / "metrics.json" | |
| metrics_path.write_text( | |
| json.dumps(metrics, ensure_ascii=False, indent=2), encoding="utf-8" | |
| ) | |
| return { | |
| "status": "success", | |
| "outputs": [ | |
| {"name": "fine_tuning_dataset", "uri": str(out_jsonl)}, | |
| {"name": "fine_tuning_dataset_csv", "uri": str(out_csv)}, | |
| {"name": "metrics", "uri": str(metrics_path)}, | |
| ], | |
| "metrics": metrics, | |
| } | |
| STEP_EXECUTORS.update( | |
| { | |
| "01-enrichment": execute_step_01_enrichment, | |
| "02-usecase-planning": execute_step_02_usecase_planning, | |
| "03-usecases": execute_step_03_usecases, | |
| "04-dedup-usecases": execute_step_04_dedup_usecases, | |
| "05-tools": execute_step_05_tools, | |
| "06-bots": execute_step_06_bots, | |
| "07-proxies": execute_step_07_user_cards, | |
| "08-dedup-proxies": execute_step_08_dedup_proxies, | |
| "09-personas": execute_step_09_personas, | |
| "10-conv": execute_step_10_conversations, | |
| "11-manipulations": execute_step_11_manipulations, | |
| "12-checks": execute_step_12_checks, | |
| "13-fine-tuning": execute_step_13_fine_tuning_data, | |
| } | |
| ) | |