| """ |
| Pure formatting functions for evox prompt generation. |
| All functions are stateless — no class or LLM dependencies. |
| """ |
|
|
| import logging |
| import os |
| from typing import Any, Dict, List, Optional, Union |
|
|
| from skydiscover.context_builder.utils import format_artifacts, prog_attr |
| from skydiscover.search.base_database import Program |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| def filter_db_stats_by_horizon(db_stats: Dict[str, Any], horizon: int) -> Dict[str, Any]: |
| """Filter db_stats to only include the last 'horizon' entries for trajectory fields.""" |
| if not db_stats or horizon <= 0: |
| return db_stats |
|
|
| filtered = dict(db_stats) |
| if recent := db_stats.get("recent_solution_stats"): |
| filtered_recent = dict(recent) |
| for key in ["execution_trace", "score_trajectory", "parent_scores"]: |
| if (val := recent.get(key)) and len(val) > horizon: |
| filtered_recent[key] = val[-horizon:] |
| filtered_recent["num_recent_iterations"] = min( |
| horizon, recent.get("num_recent_iterations", 0) |
| ) |
| filtered["recent_solution_stats"] = filtered_recent |
| return filtered |
|
|
|
|
| def format_execution_trace(execution_trace: list, window_start_score: float = None) -> str: |
| """Format execution trace with program/parent/context tuples.""" |
| if not execution_trace: |
| return "" |
|
|
| def fmt_id(pid): |
| return pid[:8] if pid and len(pid) > 8 else (pid or "None") |
|
|
| def fmt_score(s): |
| return f"{s:.4f}" if s is not None else "N/A" |
|
|
| def unpack_tuple(t): |
| if not t: |
| return None, None, None |
| if len(t) >= 3: |
| return t[0], t[1], t[2] |
| return None, t[0], t[1] |
|
|
| def fmt_program_ref(t, prefix=""): |
| label, pid, score = unpack_tuple(t) |
| if pid is None: |
| return f"{prefix}=None (seed program)" if prefix else "None" |
| label_str = f'label="{label}", ' if label else "" |
| return ( |
| f"{prefix} ({label_str}id={fmt_id(pid)}, score={fmt_score(score)})" |
| if prefix |
| else f"({label_str}id={fmt_id(pid)}, score={fmt_score(score)})" |
| ) |
|
|
| lines = [] |
| best = window_start_score |
|
|
| for entry in execution_trace: |
| prog_tuple = entry.get("program") |
| if prog_tuple is None: |
| continue |
|
|
| _, _, prog_score = unpack_tuple(prog_tuple) |
| _, _, parent_score = unpack_tuple(entry.get("parent")) |
|
|
| parent_str = fmt_program_ref(entry.get("parent"), "Parent") |
| ctx = entry.get("context") or [] |
| context_str = f"Context=[{', '.join(fmt_program_ref(c) for c in ctx)}]" |
|
|
| if prog_score is not None: |
| prog_score, parent_score = round(prog_score, 4), ( |
| round(parent_score, 4) if parent_score is not None else None |
| ) |
| if best is None: |
| best, outcome = prog_score, "first program" |
| elif prog_score > best: |
| outcome, best = f"⭐ NEW BEST (was {best:.4f})", prog_score |
| elif parent_score is not None and prog_score > parent_score: |
| outcome = f"above parent, best still {best:.4f}" |
| elif parent_score is not None and prog_score < parent_score: |
| outcome = f"regression, best still {best:.4f}" |
| else: |
| outcome = f"no change, best still {best:.4f}" |
| else: |
| outcome = "N/A" |
|
|
| lines.extend( |
| [ |
| f"Iter {entry.get('iteration', '?')}: {parent_str}, {context_str}", |
| f" -> Generated child score={fmt_score(prog_score)} ({outcome})", |
| "", |
| ] |
| ) |
|
|
| return "\n".join(lines[:-1]) if lines else "" |
|
|
|
|
| def format_db_stats_diff( |
| start_stats: Dict[str, Any], end_stats: Dict[str, Any], horizon: Optional[int] = None |
| ) -> str: |
| """Format start -> end db_stats comparison for a search algorithm's window.""" |
| if not start_stats or not end_stats: |
| return "" |
|
|
| lines = ["Population Statistics Change (Start -> End of Search Window):"] |
|
|
| start_pop = start_stats.get("population_size", "?") |
| end_pop = end_stats.get("population_size", "?") |
| lines.append(f"- population_size: {start_pop} -> {end_pop}") |
|
|
| start_summary = start_stats.get("solution_score_summary", {}) |
| end_summary = end_stats.get("solution_score_summary", {}) |
| if start_summary and end_summary: |
| parts = [] |
| key_names = [ |
| ("best", "current_best"), |
| ("q75", "75th_pct"), |
| ("q50", "50th_pct (median)"), |
| ("q25", "25th_pct"), |
| ("worst", "worst"), |
| ] |
| for key, display_name in key_names: |
| s = start_summary.get(key) |
| e = end_summary.get(key) |
| if s is not None and e is not None: |
| diff = e - s |
| sign = "+" if diff >= 0 else "" |
| parts.append(f"{display_name}: {s:.4f} -> {e:.4f} ({sign}{diff:.4f})") |
| if parts: |
| lines.append(f"- {', '.join(parts)}") |
|
|
| start_top = start_stats.get("top_solution_scores", []) |
| end_top = end_stats.get("top_solution_scores", []) |
| if start_top and end_top: |
| k = min(len(start_top), len(end_top)) |
| start_fmt = [f"{s:.4f}" for s in start_top[:k]] |
| end_fmt = [f"{s:.4f}" for s in end_top[:k]] |
| lines.append(f"- top_{k}_solution_scores: {start_fmt} -> {end_fmt}") |
|
|
| start_avg = start_stats.get("avg_solutions_per_parent") |
| end_avg = end_stats.get("avg_solutions_per_parent") |
| if start_avg is not None and end_avg is not None and start_pop and end_pop: |
| start_pct = (start_avg / start_pop * 100) if start_pop != "?" else 0 |
| end_pct = (end_avg / end_pop * 100) if end_pop != "?" else 0 |
| lines.append( |
| f"- % of solutions share the same parent on average: {start_pct:.1f}% -> {end_pct:.1f}%" |
| ) |
|
|
| sota = end_stats.get("SOTA_score") |
| if sota is not None and start_summary and end_summary: |
| start_best = start_summary.get("best") |
| end_best = end_summary.get("best") |
| if start_best is not None and end_best is not None: |
| start_gap = sota - start_best |
| end_gap = sota - end_best |
| gap_diff = end_gap - start_gap |
| sign = "+" if gap_diff >= 0 else "" |
| lines.append( |
| f"- gap_to_SOTA (lower is better): {start_gap:.4f} -> {end_gap:.4f} ({sign}{gap_diff:.4f})" |
| ) |
|
|
| start_tiers = start_summary.get("score_tiers") if start_summary else None |
| end_tiers = end_summary.get("score_tiers") if end_summary else None |
| if start_tiers and end_tiers: |
| tier_parts = [] |
| for tier_name in end_tiers.keys(): |
| start_data = start_tiers.get(tier_name, {}) |
| end_data = end_tiers.get(tier_name, {}) |
| start_pct = start_data.get("pct_programs", 0) |
| end_pct = end_data.get("pct_programs", 0) |
| start_threshold = start_data.get("threshold", "") |
| end_threshold = end_data.get("threshold", "") |
| diff = end_pct - start_pct |
| sign = "+" if diff >= 0 else "" |
| tier_parts.append( |
| f"\n {tier_name}: [{start_threshold}] {start_pct:.0f}% -> [{end_threshold}] {end_pct:.0f}% ({sign}{diff:.0f}%)" |
| ) |
| lines.append(f"- programs_by_score_tier:{','.join(tier_parts)}") |
|
|
| end_recent = end_stats.get("recent_solution_stats", {}) |
| if end_recent: |
| iters_no_improve = end_recent.get("iterations_without_improvement") |
| threshold = end_recent.get("improvement_threshold", 0.0) |
| if iters_no_improve is not None: |
| if threshold > 0: |
| lines.append( |
| f"- iterations_without_improvement (improvement <= {threshold:.4f}): {iters_no_improve}" |
| ) |
| else: |
| lines.append(f"- iterations_without_improvement: {iters_no_improve}") |
|
|
| execution_trace = end_recent.get("execution_trace") |
| if execution_trace: |
| if horizon: |
| execution_trace = execution_trace[-horizon:] |
|
|
| first_iter = execution_trace[0].get("iteration", "?") |
| last_iter = execution_trace[-1].get("iteration", "?") |
| lines.append(f"\n### Execution Trace (iterations {first_iter}-{last_iter})") |
| window_start_score = start_summary.get("best") if start_summary else None |
| lines.append( |
| format_execution_trace(execution_trace, window_start_score=window_start_score) |
| ) |
| else: |
|
|
| def fmt_scores(scores): |
| return [f"{s:.4f}" if s is not None else "N/A" for s in scores] |
|
|
| if score_trajectory := end_recent.get("score_trajectory"): |
| lines.append( |
| f"- recent_score_trajectory (last {len(score_trajectory)}): {fmt_scores(score_trajectory)}" |
| ) |
| if parent_scores := end_recent.get("parent_scores"): |
| lines.append(f"- recent_parent_scores: {fmt_scores(parent_scores)}") |
|
|
| return "\n".join(lines) |
|
|
|
|
| def format_population_state(db_stats: Dict[str, Any]) -> str: |
| """Format the population state from db_stats into clean, actionable lines.""" |
| if not db_stats: |
| return "" |
|
|
| def fmt_scores(scores): |
| return [f"{s:.4f}" if s is not None else "N/A" for s in scores] |
|
|
| lines = [] |
| pop_size = db_stats.get("population_size") |
| lines.append(f"- population_size: {pop_size}") |
|
|
| score_summary = db_stats.get("solution_score_summary") or {} |
| sota = db_stats.get("SOTA_score") |
| best = score_summary.get("best") |
| q75, q50, q25 = ( |
| score_summary.get("q75"), |
| score_summary.get("q50") or score_summary.get("median"), |
| score_summary.get("q25"), |
| ) |
| worst = score_summary.get("worst") |
|
|
| if best is not None: |
| pct = lambda v: (v / best * 100) if best > 0 and v is not None else 0 |
|
|
| dist_parts = [f"current_best={best:.4f}"] |
| for name, val in [("75th_pct", q75), ("50th_pct", q50), ("25th_pct", q25)]: |
| if val is not None: |
| dist_parts.append(f"{name}={val:.4f} ({pct(val):.0f}%)") |
| if worst is not None: |
| dist_parts.append(f"worst={worst:.4f}") |
|
|
| lines.append(f"- score_distribution: {', '.join(dist_parts)}") |
| if sota is not None: |
| lines.append(f"- gap_to_SOTA: SOTA={sota:.4f}, gap={sota - best:.4f}") |
|
|
| if tiers := score_summary.get("score_tiers"): |
| tier_parts = [ |
| f"{n} ({d.get('threshold','')}): {d.get('pct_programs',0):.0f}%" |
| for n, d in tiers.items() |
| ] |
| lines.append(f"- programs_by_score_tier: {', '.join(tier_parts)}") |
|
|
| if (unique := score_summary.get("unique_scores")) is not None: |
| lines.append(f"- unique_score_values: {unique}") |
|
|
| if (avg := db_stats.get("avg_solutions_per_parent")) is not None and pop_size: |
| lines.append(f"- {avg / pop_size * 100:.1f}% of solutions share the same parent on average") |
|
|
| if top_scores := db_stats.get("top_solution_scores"): |
| best_score = top_scores[0] |
| best_count = ( |
| sum( |
| 1 |
| for s in top_scores |
| if isinstance(s, (int, float)) and round(s, 4) == round(best_score, 4) |
| ) |
| if isinstance(best_score, (int, float)) |
| else 0 |
| ) |
| lines.append(f"- top_{len(top_scores)}_scores: {fmt_scores(top_scores)}") |
| if best_count > 1: |
| lines.append(f" - Top score ({best_score:.4f}) repeated {best_count}x") |
| if best_count == len(top_scores): |
| lines.append(f" (⚠️ ALL {best_count} identical)") |
|
|
| if recent := db_stats.get("recent_solution_stats"): |
| if (iters := recent.get("iterations_without_improvement")) and iters > 0: |
| thresh = recent.get("improvement_threshold", 0.0) |
| thresh_str = f" by more than {thresh:.4f}" if thresh > 0 else "" |
| lines.append(f"- No improvement{thresh_str} for {iters} iterations") |
|
|
| def score_bucket(score): |
| if score is None or best is None: |
| return None |
| if score >= best: |
| return "at best" |
| if q75 and score >= q75: |
| return "75-100th" |
| if q50 and score >= q50: |
| return "50-75th" |
| if q25 and score >= q25: |
| return "25-50th" |
| return "0-25th" |
|
|
| for key, label in [("most_reused_parent", "parent"), ("most_reused_context", "context")]: |
| if (ratio := recent.get(f"{key}_ratio")) and ratio > 0: |
| bucket = score_bucket(recent.get(f"{key}_score")) |
| score_str = f", score {bucket}" if bucket else "" |
| lines.append(f"- {label}: {ratio*100:.0f}% reuse rate{score_str}") |
|
|
| if traj := recent.get("score_trajectory"): |
| lines.append(f"- recent_scores (last {len(traj)}): {fmt_scores(traj)}") |
| if parent := recent.get("parent_scores"): |
| lines.append(f"- recent_parent_scores: {fmt_scores(parent)}") |
|
|
| return "\n".join(lines) |
|
|
|
|
| def format_current_program( |
| current_program: Union[Program, Dict[str, Program]], |
| language: str, |
| improvement_areas: Optional[str] = None, |
| ) -> str: |
| """Format current program with metrics and solution.""" |
| if not current_program: |
| return "" |
|
|
| if isinstance(current_program, dict) and current_program: |
| label = list(current_program.keys())[0] or "Current Search Program" |
| program = list(current_program.values())[0] |
| else: |
| label = "Current Search Program" |
| program = current_program |
| solution = prog_attr(program, "solution") |
| metrics = prog_attr(program, "metrics", {}) |
|
|
| window_start = int(metrics.get("window_start_iteration", 0)) |
| horizon = int(metrics.get("search_window_horizon") or 0) |
| window_end = window_start + horizon |
| start_score = metrics.get("search_window_start_score", 0.0) |
| end_score = metrics.get("search_window_end_score", 0.0) |
| combined_score = metrics.get("combined_score", 0.0) |
| improvement = end_score - start_score |
|
|
| lines = [f"## {label}\n", "### Metrics"] |
| if improvement_areas: |
| lines.append(f"Focus areas:\n{improvement_areas}\n") |
| lines.append(f"Search Algorithm Score = {combined_score:.4f}") |
| lines.append( |
| f"This search algorithm ran from iteration {window_start} to {window_end} ({horizon} iterations)" |
| ) |
| lines.append( |
| f"This search algorithm changed the downstream solution combined_score by: {start_score:.4f} -> {end_score:.4f} (+{improvement:.4f})" |
| ) |
| lines.append(f"\n### Solution\n```{language}") |
| lines.append(solution) |
| lines.append("```\n") |
|
|
| artifact_section = format_artifacts(program, heading="###") |
| if artifact_section: |
| lines.append(artifact_section) |
|
|
| return "\n".join(lines) |
|
|
|
|
| def identify_search_improvement_areas( |
| current_program: Program, |
| metrics: Dict[str, float], |
| previous_programs: List[Program], |
| simplification_threshold: Optional[int] = None, |
| ) -> str: |
| """Identify improvement areas for search algorithms based on combined_score.""" |
|
|
| def safe_float(val): |
| if val is None: |
| return 0.0 |
| try: |
| return float(val) |
| except (ValueError, TypeError): |
| return 0.0 |
|
|
| improvement_areas = [] |
| current_score = safe_float(metrics.get("combined_score")) |
|
|
| if previous_programs: |
| prev_program = previous_programs[-1] |
| prev_metrics = prog_attr(prev_program, "metrics", {}) or {} |
| prev_score = safe_float(prev_metrics.get("combined_score")) |
|
|
| if current_score > prev_score: |
| improvement_areas.append( |
| f"Search algorithm score improved: {prev_score:.4f} → {current_score:.4f}" |
| ) |
| elif current_score < prev_score: |
| improvement_areas.append( |
| f"Search algorithm score declined: {prev_score:.4f} → {current_score:.4f}. Consider revising." |
| ) |
| else: |
| improvement_areas.append(f"Search algorithm score unchanged at {current_score:.4f}") |
|
|
| if not improvement_areas: |
| improvement_areas.append("Focus on improving the search algorithm score (combined_score)") |
|
|
| if simplification_threshold: |
| code_length = len(prog_attr(current_program, "solution")) |
| if code_length > simplification_threshold: |
| improvement_areas.append( |
| f"Consider simplifying - solution length exceeds {simplification_threshold} characters" |
| ) |
|
|
| return "\n".join(f"- {area}" for area in improvement_areas) |
|
|
|
|
| def format_search_window_context(context: Dict[str, Any]) -> str: |
| """Format the current search window context from context['search_stats'].""" |
| stats = context.get("search_stats") or {} |
| window_start = int(stats.get("window_start_iteration") or 0) |
| total = int(stats.get("total_iterations") or 100) |
| horizon = int(stats.get("search_window_horizon", 0)) |
| improvement_threshold = float(stats.get("improvement_threshold") or 0.0) |
|
|
| lines = [] |
|
|
| window_line = f"- Your newly designed search algorithm will start at iteration {window_start} out of {total}." |
| if horizon > 0: |
| if improvement_threshold > 0: |
| window_line += f" It will run for at least {horizon} iterations (potentially more if improving), but will be cut to just {horizon} iterations if it fails to improve the solution score by more than {improvement_threshold:.4f}." |
| else: |
| window_line += f" It will run for at least {horizon} iterations (potentially more if improving), but will be cut to just {horizon} iterations if it fails to improve the solution score." |
| lines.append(window_line) |
|
|
| if improvement_threshold > 0: |
| lines.append( |
| f"- If your algorithm fails to improve the solution score by more than {improvement_threshold:.4f} during this window, it will be replaced." |
| ) |
| else: |
| lines.append( |
| "- If your algorithm fails to improve the solution score during this window, it will be replaced." |
| ) |
|
|
| lines.append( |
| "- Goal: Design a better search strategy (e.g. how to select and manage solution programs) to improve the downstream solution score." |
| ) |
| lines.append( |
| "- NOTE: Exactly one program is generated per iteration. Keep the population size in mind when designing your search algorithm." |
| ) |
|
|
| return "\n".join(lines) |
|
|
|
|
| def format_problem_description(problem_config: Any) -> str: |
| """Format the problem description from the prompt config.""" |
| if problem_config is None: |
| return "(No problem description provided)" |
| if isinstance(problem_config, str): |
| return problem_config |
| if hasattr(problem_config, "system_message") and problem_config.system_message: |
| return str(problem_config.system_message) |
| return str(problem_config) if problem_config else "(No problem description provided)" |
|
|
|
|
| def format_evaluator_context(evaluator_path: Any) -> str: |
| """Format the evaluator context by reading the evaluator file.""" |
| if evaluator_path is None: |
| return "(No evaluator context provided)" |
|
|
| if isinstance(evaluator_path, str): |
| if not evaluator_path.endswith(".py"): |
| if evaluator_path.strip().startswith("```"): |
| return evaluator_path |
| return f"```python\n{evaluator_path}\n```" |
| try: |
| if os.path.isfile(evaluator_path): |
| with open(evaluator_path, "r") as f: |
| return f"```python\n{f.read()}\n```" |
| except Exception as e: |
| logger.warning(f"Failed to read evaluator file {evaluator_path}: {e}") |
|
|
| return f"Evaluator file: {evaluator_path}" |
|
|
|
|
| def prepare_search_algorithms_data( |
| other_context_programs: Union[List[Program], Dict[str, List[Program]]], |
| format_stats_diff=format_db_stats_diff, |
| filter_by_horizon=filter_db_stats_by_horizon, |
| ) -> List[Dict[str, Any]]: |
| """Prepare data for batch summarization of context programs.""" |
| if not other_context_programs: |
| return [] |
|
|
| if isinstance(other_context_programs, dict): |
| flat_programs = [] |
| for programs in other_context_programs.values(): |
| if programs: |
| flat_programs.extend(programs) |
| programs_list = flat_programs |
| else: |
| programs_list = other_context_programs |
|
|
| all_programs_data = [] |
|
|
| for idx, program in enumerate(programs_list, start=1): |
| solution = prog_attr(program, "solution") |
| metrics = prog_attr(program, "metrics", {}) |
| metadata = prog_attr(program, "metadata", {}) |
|
|
| start_db_stats = metadata.get("start_db_stats") |
| end_db_stats = metadata.get("end_db_stats") |
| horizon = int(metrics.get("search_window_horizon", 0)) |
|
|
| if start_db_stats and end_db_stats: |
| start_db_stats = filter_by_horizon(start_db_stats, horizon) |
| end_db_stats = filter_by_horizon(end_db_stats, horizon) |
|
|
| if start_db_stats and end_db_stats: |
| db_stats_text = format_stats_diff(start_db_stats, end_db_stats, horizon=horizon) |
| all_programs_data.append( |
| { |
| "program_num": idx, |
| "solution": solution, |
| "db_stats_text": db_stats_text, |
| "combined_score": metrics.get("combined_score", 0.0), |
| "improvement": metrics.get("search_window_end_score", 0.0) |
| - metrics.get("search_window_start_score", 0.0), |
| } |
| ) |
|
|
| return all_programs_data |
|
|
|
|
| def format_single_program_section( |
| program: Program, idx: int, language: str, summaries_by_num: Dict[int, str] |
| ) -> List[str]: |
| """Format a single program with metrics and solution/summary.""" |
| solution = prog_attr(program, "solution") |
| metrics = prog_attr(program, "metrics", {}) |
|
|
| window_start = int(metrics.get("window_start_iteration", 0)) |
| horizon = int(metrics.get("search_window_horizon", 0)) |
| start_score = metrics.get("search_window_start_score", 0.0) |
| end_score = metrics.get("search_window_end_score", 0.0) |
| combined_score = metrics.get("combined_score", 0.0) |
|
|
| lines = [ |
| f"### Program {idx}\n", |
| "#### Metrics", |
| f"Search Algorithm Score = {combined_score:.4f}", |
| f"Ran iterations {window_start} to {window_start + horizon} ({horizon} iterations)", |
| f"Score changed: {start_score:.4f} -> {end_score:.4f} (+{end_score - start_score:.4f})", |
| ] |
|
|
| if idx in summaries_by_num: |
| lines.append(f"\n#### Summary\n{summaries_by_num[idx]}\n") |
| else: |
| lines.extend(["\n#### Solution", f"```{language}", solution, "```\n"]) |
|
|
| artifact_section = format_artifacts(program, heading="####") |
| if artifact_section: |
| lines.append(artifact_section) |
|
|
| return lines |
|
|
|
|
| def format_search_algorithms( |
| other_context_programs: Union[List[Program], Dict[str, List[Program]]], |
| language: str, |
| summaries_by_num: Optional[Dict[int, str]] = None, |
| ) -> str: |
| """Format previous search algorithms with window context.""" |
| if not other_context_programs: |
| return "" |
|
|
| summaries_by_num = summaries_by_num or {} |
| lines = [] |
|
|
| if isinstance(other_context_programs, dict): |
| global_idx = 0 |
| for label, programs in other_context_programs.items(): |
| display_label = label or "Other Reference Programs" |
| lines.extend( |
| [f"\n## {display_label}\n", "Diverse search programs that may inspire new ideas:\n"] |
| ) |
| for program in programs or []: |
| global_idx += 1 |
| lines.extend( |
| format_single_program_section(program, global_idx, language, summaries_by_num) |
| ) |
| else: |
| lines.append("## Other Reference Programs\n") |
| for idx, program in enumerate(other_context_programs, start=1): |
| lines.extend(format_single_program_section(program, idx, language, summaries_by_num)) |
|
|
| return "\n".join(lines) |
|
|
|
|
| def parse_batch_summaries(response: str, programs_data: List[Dict]) -> Dict[int, str]: |
| """Parse batch summary response into individual summaries by program number.""" |
| summaries = {} |
| if not response or not programs_data: |
| return summaries |
|
|
| for prog in programs_data: |
| num = prog["program_num"] |
| marker = f"[PROGRAM {num}]" |
| if marker in response: |
| start_idx = response.find(marker) + len(marker) |
| next_idx = len(response) |
| for other in programs_data: |
| if other["program_num"] != num: |
| other_marker = f"[PROGRAM {other['program_num']}]" |
| if other_marker in response: |
| idx = response.find(other_marker) |
| if start_idx < idx < next_idx: |
| next_idx = idx |
| summaries[num] = response[start_idx:next_idx].strip() |
|
|
| if not summaries and response: |
| summaries[programs_data[0]["program_num"]] = response |
| return summaries |
|
|