Spaces:
Sleeping
Sleeping
File size: 122,576 Bytes
cacd4d0 c0b1bdf cacd4d0 c0b1bdf cacd4d0 b4d947d 5b2a612 b4d947d cacd4d0 b4d947d cacd4d0 b4d947d cacd4d0 c0b1bdf cacd4d0 c0b1bdf cacd4d0 c0b1bdf cacd4d0 c0b1bdf cacd4d0 c0b1bdf cacd4d0 5b2a612 cacd4d0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 |
"""
Universal GEPA adapter for user-defined metrics and LLM clients.
"""
from .base_adapter import BaseGepaAdapter
from ..data.converters import UniversalConverter
from typing import Any, Dict, List, Optional
import logging
import re
from gepa.core.adapter import EvaluationBatch
logger = logging.getLogger(__name__)
class UniversalGepaAdapter(BaseGepaAdapter):
"""
Universal GEPA adapter that works with any LLM client and evaluator.
This adapter uses the existing UniversalConverter for data processing
and delegates LLM generation and evaluation to user-provided components.
Features:
- Optimized multi-variation JSON generation (66% cost reduction)
- Robust parsing with multiple fallback strategies
- Automatic fallback to sequential generation if JSON parsing fails
"""
# Fallback system prompt for sequential generation (when JSON parsing fails)
_FALLBACK_SYSTEM_PROMPT = """You are an expert prompt engineer specializing in iterative prompt optimization.
Your task: Given the CURRENT PROMPT and its EVALUATION FEEDBACK, generate an IMPROVED version of the prompt that addresses all identified issues.
Core Requirements:
1. OUTPUT ONLY the improved prompt text (no explanations, no analysis, no meta-commentary)
2. START directly with the prompt (e.g., "You are a mobile GUI agent..." or similar task-appropriate opening)
3. PRESERVE the core task domain and output format requirements
4. INTEGRATE improvements from feedback naturally into the prompt structure
5. MAINTAIN clarity, specificity, and actionability
Quality Standards:
- Be specific and concrete (avoid vague instructions)
- Use clear, imperative language for task instructions
- Include edge case handling if feedback identifies confusion
- Ensure the prompt is self-contained and unambiguous
DO NOT include:
- Analysis of what went wrong
- Explanations of your changes
- Meta-text like "Here's an improved version..." or "Based on feedback..."
- Recommendations or suggestions (those are already in the feedback)
Output the improved prompt directly and only the prompt."""
def __init__(self, llm_client, evaluator, data_converter=None, llego_layer=None):
"""
Initialize universal adapter.
Args:
llm_client: User-provided LLM client (must inherit from BaseLLMClient)
evaluator: User-provided evaluator (must inherit from BaseEvaluator)
data_converter: Optional custom data converter (uses UniversalConverter by default)
llego_layer: Optional LLEGO integration layer for genetic operations
"""
# Store LLEGO layer first
self.llego = llego_layer
# If LLEGO is provided, wrap the LLM client
# Note: If config is passed separately, it will be handled by optimizer
if llego_layer is not None:
from ..llms.llego_enhanced_llm import LLEGOEnhancedLLMClient
# Only wrap if not already wrapped (optimizer may have wrapped it with config)
if not isinstance(llm_client, LLEGOEnhancedLLMClient):
# Wrap before calling super().__init__
# Config will be set later by optimizer if hybrid mode is enabled
llm_client = LLEGOEnhancedLLMClient(llm_client, llego_layer, config=None, verbose=True)
else:
# Already wrapped, but update config if available
if hasattr(llm_client, 'config') and llm_client.config is None:
# Config will be set by optimizer later
pass
# Initialize parent (this sets up self.logger)
super().__init__(llm_client, evaluator)
# Use existing UniversalConverter for data processing
self.data_converter = data_converter or UniversalConverter()
# π₯ NEW: Initialize optimization state tracking
self._is_baseline_evaluation = False # Flag to distinguish baseline vs optimization
self._last_candidate = None # Track last candidate to detect changes
self._gepa_iteration = 0 # Track actual GEPA iteration (not evaluation count)
# Track candidates for logging
self._evaluation_count = 0
# Track current evaluation context
self._current_evaluation_type = None # 'seed', 'gepa_reflection', 'llego_crossover', 'llego_mutation'
self._current_dataset_type = None # 'dfeedback' or 'dpareto'
self._baseline_score = None # Store baseline score for comparison
# Track candidate sources by prompt text (in case GEPA doesn't pass source field)
self._candidate_sources = {} # Maps prompt_text -> source_type
# Track validation set size for better dataset type detection
self._valset_size = None # Will be set by optimizer
self._valset = None # Will be set by optimizer - stores actual valset for Dpareto evaluation
# π₯ CRITICAL: Track which candidates have been evaluated on Dpareto to avoid double evaluation
# Key: normalized prompt text, Value: (fitness_score, candidate_type, timestamp)
self._dpareto_evaluated_candidates = {} # Maps prompt -> (score, type)
# π₯ HYBRID MODE: Storage for generated candidates
self._generated_candidates = [] # Store hybrid mode candidates
self._candidate_generation_active = False # Track if we're generating candidates
self._config = None # Will be set by optimizer if hybrid mode enabled
self._reflection_lm_client = None # Will be set by optimizer
# π₯ FORMAT AWARENESS: Store detected output format for better prompts
self._detected_format = None # Will be populated from expected outputs
self._format_detection_done = False # Only detect once
# Log initialization
model_info = llm_client.get_model_info()
if llego_layer is not None:
self.logger.info(f"π Initialized Universal adapter with {model_info}")
self.logger.info(f"𧬠LLEGO integration ENABLED - LLM client is wrapped for genetic operations")
else:
self.logger.info(f"π Initialized Universal adapter with {model_info}")
def _clean_llm_output(self, output: str) -> str:
"""
π₯ CRITICAL: Clean LLM output before evaluation.
LLMs often wrap JSON/structured output in markdown code blocks.
This causes evaluation to fail because the evaluator sees:
"```json\n{\"key\": \"value\"}\n```"
Instead of:
"{\"key\": \"value\"}"
This method extracts the clean content for fair comparison.
"""
if not output or not isinstance(output, str):
return output
cleaned = output.strip()
# Remove markdown code blocks (```json ... ``` or ``` ... ```)
code_block_match = re.search(r'```(?:json|JSON)?\s*([\s\S]*?)\s*```', cleaned)
if code_block_match:
extracted = code_block_match.group(1).strip()
# Only use extracted if it looks like valid content
if extracted and (extracted.startswith('{') or extracted.startswith('[') or len(extracted) > 10):
self.logger.debug(f"π¦ Cleaned markdown code block from LLM output")
return extracted
# Remove leading/trailing markdown artifacts
# Handle cases like "Here is the JSON:\n```json\n...\n```"
if '```' in cleaned:
# Try to extract content between first ``` and last ```
parts = cleaned.split('```')
if len(parts) >= 3:
# Content is in the middle part(s)
middle_content = parts[1]
# Remove language tag if present (e.g., "json\n")
middle_content = re.sub(r'^(?:json|JSON|python|text)\s*\n?', '', middle_content).strip()
if middle_content:
return middle_content
return cleaned
def _detect_and_cache_format(self, batch: List[Dict[str, Any]]) -> None:
"""
Detect output format from expected outputs and cache for future use.
This enables format-aware prompting and feedback generation.
"""
try:
from ..utils.format_detection import detect_output_format
# Extract expected outputs from batch
expected_outputs = []
for item in batch:
# Try to extract output directly, or standardize if needed
output = None
if isinstance(item, dict):
# Try common output field names first
output = item.get('output') or item.get('expected_output') or item.get('result') or item.get('answer')
if not output:
# Standardize using converter's private method (same as _evaluate_batch_mode)
try:
standardized = self.data_converter._standardize([item])[0]
output = standardized.get('output')
except Exception:
pass
if output and isinstance(output, str) and output.strip():
expected_outputs.append(output)
if expected_outputs:
self._detected_format = detect_output_format(expected_outputs)
self.logger.info(f"π FORMAT DETECTED: {self._detected_format['format_type']}")
self.logger.info(f" Spec: {self._detected_format['format_spec'][:100]}...")
self.logger.info(f" Avg length: {self._detected_format['avg_length']} chars")
# Debug logging removed - not needed in production
else:
self.logger.warning("β οΈ No expected outputs found for format detection")
self._detected_format = None
# Debug logging removed - not needed in production
except Exception as e:
self.logger.warning(f"β οΈ Format detection failed: {e}")
self._detected_format = None
def evaluate(self, batch: List[Dict[str, Any]], candidate: Dict[str, str],
capture_traces: bool = False) -> EvaluationBatch:
"""
Evaluate candidates using user-provided LLM client and evaluator.
This method automatically detects BatchLLMClient and uses batch processing
for cost savings, or falls back to standard individual processing.
This method works with any data type supported by UniversalConverter.
π₯ IMPORTANT: We only optimize system_prompt, NOT user_prompt.
The user_prompt varies per tester and is not part of optimization.
π₯ CACHING: Seed prompt is evaluated ONLY ONCE on Dpareto (validation set).
Subsequent evaluations return cached result to save API calls and ensure consistency.
"""
system_prompt = candidate.get('system_prompt', '')
# π₯ FORMAT DETECTION: Detect output format from expected outputs (once)
if not self._format_detection_done and batch:
self._detect_and_cache_format(batch)
self._format_detection_done = True
# Determine dataset type first (needed for cache check)
batch_size_threshold = self._config.batch_size if hasattr(self, '_config') and self._config else 8
# π₯ CRITICAL FIX: If capture_traces=True, this is a TRAINING MINIBATCH for reflection
# GEPA calls with capture_traces=True when it needs trajectories for reflective mutation
# We must NEVER use cache in this case, otherwise trajectories=None breaks GEPA!
if capture_traces:
dataset_type = 'dfeedback' # Training minibatch - need fresh evaluation with trajectories
self.logger.info(f"π― evaluate() called with capture_traces=True - this is a TRAINING MINIBATCH")
self.logger.info(f" Batch size: {len(batch)}, Will generate trajectories for reflection")
# If _is_baseline_evaluation is True, we KNOW this is the validation set
elif hasattr(self, '_is_baseline_evaluation') and self._is_baseline_evaluation:
dataset_type = 'dpareto' # Baseline is ALWAYS evaluated on validation set
self.logger.debug(f"π― Forced dataset_type to 'dpareto' (baseline evaluation flag is True)")
elif hasattr(self, '_valset_size') and self._valset_size is not None and len(batch) == self._valset_size:
# π₯ FIX: Use == not >= to avoid misclassifying training minibatches as validation set
dataset_type = 'dpareto' # EXACT validation set size = Dpareto
elif len(batch) > batch_size_threshold * 1.5:
dataset_type = 'dpareto' # Much larger than batch = likely full valset
else:
dataset_type = 'dfeedback' # Small batch = training minibatch for reflection
# π₯ CRITICAL: Check cache to avoid re-evaluating same prompt on Dpareto
# This ensures seed prompt is evaluated ONLY ONCE
# NOTE: Only applies when capture_traces=False (validation set evaluation)
if dataset_type == 'dpareto' and not capture_traces:
normalized_prompt = system_prompt.strip().strip('"\'')
if normalized_prompt in self._dpareto_evaluated_candidates:
existing_score, existing_type, _ = self._dpareto_evaluated_candidates[normalized_prompt]
self.logger.info(
f"β»οΈ CACHE HIT: Prompt already evaluated on Dpareto "
f"(score={existing_score:.4f}, type={existing_type}) - skipping re-evaluation"
)
# Return cached result - create EvaluationBatch with cached score
cached_outputs = [f"[CACHED: {existing_type}]"] * len(batch)
cached_scores = [existing_score] * len(batch)
# Still update baseline if this is seed and baseline not set
from ..utils.pareto_logger import get_pareto_logger
pareto_log = get_pareto_logger()
if existing_type == 'seed' and self._baseline_score is None:
self._baseline_score = existing_score
pareto_log.set_baseline(existing_score)
self.logger.info(f"π Baseline score set from cache: {existing_score:.4f}")
# Log to Pareto logger (for tracking, but no re-evaluation)
pareto_log.log_candidate_evaluation(
prompt=system_prompt,
score=existing_score,
candidate_type=existing_type,
dataset_type='dpareto'
)
return EvaluationBatch(
outputs=cached_outputs,
scores=cached_scores,
trajectories=None # No traces for cached results
)
# Determine candidate type
# Priority order:
# 1. Check candidate dict for 'source' field (from LLM wrapper)
# 2. Check _candidate_sources mapping (from previous evaluations)
# 3. Check _current_evaluation_type (from log_proposed_candidate)
# 4. Infer from context (seed, repeat, etc.)
candidate_type = candidate.get('source') # First try candidate dict
if not candidate_type or candidate_type == 'unknown':
candidate_type = self._candidate_sources.get(system_prompt) # Check mapping
if not candidate_type or candidate_type == 'unknown':
candidate_type = self._current_evaluation_type # Use stored type
if not candidate_type or candidate_type == 'unknown':
# Try to infer from prompt or metadata
if system_prompt == self._last_candidate:
candidate_type = 'repeat' # Same prompt being re-evaluated
elif self._evaluation_count == 0 or 'seed' in str(candidate.get('source', '')).lower():
candidate_type = 'seed' # Explicitly mark as seed
self.logger.debug("π± Detected seed prompt (Sβ)")
else:
candidate_type = 'unknown' # Truly unknown
# Debug logging removed - not needed in production
# Store source for future lookups (always update if we found a valid type)
if candidate_type and candidate_type != 'unknown' and system_prompt not in self._candidate_sources:
self._candidate_sources[system_prompt] = candidate_type
self.logger.debug(f" π Stored candidate type: {candidate_type} for prompt (length: {len(system_prompt)})")
# Dataset type already determined above for cache check - reuse it
# Debug logging removed - not needed in production
# Check if this is a new candidate (different from last one)
if self._last_candidate != system_prompt:
self._evaluation_count += 1
# π₯ CRITICAL: If this is baseline evaluation, force candidate_type to 'seed'
if self._is_baseline_evaluation:
candidate_type = 'seed'
self.logger.debug(f"π± Baseline evaluation detected - setting candidate_type to 'seed'")
self._current_evaluation_type = candidate_type
self._current_dataset_type = dataset_type
self._last_candidate = system_prompt
# Minimal logging - just track what we're evaluating
if self._is_baseline_evaluation:
self.logger.debug(f"Evaluating baseline (Sβ) on {dataset_type}")
else:
self.logger.debug(f"Evaluating candidate #{self._evaluation_count} ({candidate_type}) on {dataset_type}")
# Detect and use batch mode if available
from ..llms.batch_llm import BatchLLMClient
is_batch_mode = isinstance(self.llm_client, BatchLLMClient)
if is_batch_mode:
outputs, scores, trajectories = self._evaluate_batch_mode(
batch, system_prompt, capture_traces
)
else:
outputs, scores, trajectories = self._evaluate_standard_mode(
batch, system_prompt, capture_traces
)
avg_score = sum(scores) / len(scores) if scores else 0.0
# Debug logging removed - not needed in production
# π₯ CRITICAL FIX: Baseline MUST be set from seed's first Dpareto evaluation ONLY
# This ensures FAIR comparison: seed and candidates evaluated on SAME dataset (Dpareto) with SAME number of datapoints
#
# Fair evaluation requires:
# - Seed baseline: Dpareto (validation set) - first evaluation during optimization
# - Candidates: Dpareto (validation set) - same dataset, same size
# - Same conditions = fair comparison β
#
# We IGNORE test set for baseline - baseline must come from Dpareto to ensure same dataset/size
from ..utils.pareto_logger import get_pareto_logger
pareto_log = get_pareto_logger()
# π₯ FIX: Check if this is baseline evaluation AND dpareto - set baseline with priority
is_baseline_eval = hasattr(self, '_is_baseline_evaluation') and self._is_baseline_evaluation
if self._baseline_score is None:
# π₯ FIX B: Set baseline on FIRST Dpareto evaluation, regardless of candidate type
# Also set baseline if this is explicitly marked as baseline evaluation
if self._current_dataset_type == 'dpareto' or is_baseline_eval:
# β
PRIMARY: Set baseline from FIRST Dpareto evaluation (seed or first candidate)
self._baseline_score = avg_score
pareto_log.set_baseline(avg_score)
self.logger.info(f"π Baseline score (Dpareto, {len(batch)} samples): {avg_score:.4f}")
self.logger.info(f" β
Baseline set from {'baseline evaluation' if is_baseline_eval else 'first Dpareto'} (type: {self._current_evaluation_type})")
# Debug logging removed - not needed in production
# Note: Test set evaluations are ignored for baseline - baseline comes from Dpareto
else:
# π₯ SAFETY CHECK: Ensure Pareto logger also has baseline if adapter has it
# This handles the case where optimizer set baseline in adapter but Pareto logger wasn't updated
if (self._current_dataset_type == 'dpareto' or is_baseline_eval) and pareto_log.baseline_score is None:
pareto_log.set_baseline(self._baseline_score)
self.logger.info(f"β
Synchronized baseline in Pareto logger: {self._baseline_score:.4f}")
# Track Dpareto evaluations for Pareto front
if self._current_dataset_type == 'dpareto':
from ..utils.pareto_logger import get_pareto_logger
pareto_log = get_pareto_logger()
pareto_log.log_candidate_evaluation(
prompt=system_prompt,
score=avg_score,
candidate_type=self._current_evaluation_type or 'unknown',
dataset_type=self._current_dataset_type
)
# Track evaluated candidates
normalized_prompt = system_prompt.strip().strip('"\'')
if normalized_prompt not in self._dpareto_evaluated_candidates:
self._dpareto_evaluated_candidates[normalized_prompt] = (
avg_score, self._current_evaluation_type or 'unknown', 'evaluated_by_gepa'
)
self.logger.debug(f"Evaluation complete: score={avg_score:.4f}")
# π₯ CRITICAL: Update _best_candidate and _best_score with average fitness for Dpareto evaluations
# This ensures the adapter tracks the best average fitness, not just per-sample scores
# Only update if this score is better than current best
if self._current_dataset_type == 'dpareto':
if self._best_score is None or avg_score > self._best_score:
self._best_score = avg_score
self._best_candidate = {
'system_prompt': system_prompt,
'fitness': avg_score,
'source': self._current_evaluation_type or 'unknown'
}
self.logger.info(f"β
Updated best candidate from Dpareto evaluation: f={avg_score:.4f} (type: {self._current_evaluation_type})")
return EvaluationBatch(outputs=outputs, scores=scores, trajectories=trajectories)
def _evaluate_batch_mode(
self,
batch: List[Dict],
system_prompt: str,
capture_traces: bool
) -> tuple:
"""
Batch mode evaluation - process all samples in one API call.
This method prepares all requests, submits them as a batch job to Gemini,
waits for completion, then evaluates all results.
"""
# Prepare all requests
requests = []
standardized_items = []
for item in batch:
standardized_item = self.data_converter._standardize([item])[0]
standardized_items.append(standardized_item)
request = {
'system_prompt': system_prompt,
'user_prompt': standardized_item['input']
}
if standardized_item.get('image'):
request['image_base64'] = standardized_item['image']
requests.append(request)
# Submit batch job and get all results at once
batch_results = self.llm_client.generate_batch(requests)
# Process results
outputs = []
scores = []
trajectories = [] if capture_traces else None
for i, (llm_response, standardized_item) in enumerate(zip(batch_results, standardized_items)):
# Extract content
raw_output = llm_response.get("content", "")
# π₯ CRITICAL: Clean markdown wrappers before evaluation
predicted_output = self._clean_llm_output(raw_output)
outputs.append(predicted_output)
# Evaluate with cleaned output
evaluation_results = self.evaluator.evaluate(
predicted_output,
standardized_item['output']
)
composite_score = evaluation_results.get("composite_score", 0.0)
scores.append(composite_score)
# Update tracking
if composite_score > self._best_score:
self._best_score = composite_score
self._best_candidate = {'system_prompt': system_prompt}
# Capture traces
if capture_traces:
trajectories.append({
'input_data': standardized_item,
'predicted_output': predicted_output,
'evaluation_results': evaluation_results
})
# Concise logging with element IDs and candidate notation
predicted_element = evaluation_results.get('predicted_element', '?')
expected_element = evaluation_results.get('expected_element', '?')
status = "β
" if composite_score == 1.0 else "β"
# Add notation for candidate type
notation_map = {'seed': 'Sβ', 'gepa_reflection': 'Sα΅£', 'llego_crossover': 'Oββ', 'llego_mutation': 'Oβα΅€β'}
notation = notation_map.get(self._current_evaluation_type, 'S')
self.logger.info(f" [{notation}] Sample {i+1}: Predicted={predicted_element}, Expected={expected_element}, Score={composite_score:.2f} {status}")
return outputs, scores, trajectories
def _evaluate_standard_mode(
self,
batch: List[Dict],
system_prompt: str,
capture_traces: bool
) -> tuple:
"""
Standard mode evaluation - process samples individually (existing logic).
This is the original implementation, preserved for backward compatibility
and for use with non-batch LLM clients.
"""
outputs = []
scores = []
trajectories = [] if capture_traces else None
for i, item in enumerate(batch):
# Use existing data processing logic
standardized_item = self.data_converter._standardize([item])[0]
# Prepare generation parameters
generation_params = {
'system_prompt': system_prompt,
'user_prompt': standardized_item['input']
}
# Add image if present
if standardized_item.get('image'):
generation_params['image_base64'] = standardized_item['image']
# Generate response using user's LLM client
llm_response = self.llm_client.generate(**generation_params)
# Extract content
if isinstance(llm_response, dict):
raw_output = llm_response.get("content", "")
else:
raw_output = str(llm_response)
# π₯ CRITICAL: Clean markdown wrappers before evaluation
predicted_output = self._clean_llm_output(raw_output)
outputs.append(predicted_output)
# Evaluate using user's evaluator with cleaned output
evaluation_results = self.evaluator.evaluate(
predicted_output,
standardized_item['output']
)
composite_score = evaluation_results.get("composite_score", 0.0)
scores.append(composite_score)
# Debug logging removed - not needed in production
# Update performance tracking
self._evaluation_count += 1
if composite_score > self._best_score:
self._best_score = composite_score
self._best_candidate = {'system_prompt': system_prompt}
# Capture traces if requested
if capture_traces:
trajectories.append({
'input_data': standardized_item,
'predicted_output': predicted_output,
'evaluation_results': evaluation_results
})
# Concise logging with element IDs and candidate notation
predicted_element = evaluation_results.get('predicted_element', '?')
expected_element = evaluation_results.get('expected_element', '?')
status = "β
" if composite_score == 1.0 else "β"
# Add notation for candidate type
notation_map = {'seed': 'Sβ', 'gepa_reflection': 'Sα΅£', 'llego_crossover': 'Oββ', 'llego_mutation': 'Oβα΅€β'}
notation = notation_map.get(self._current_evaluation_type, 'S')
self.logger.info(f" [{notation}] Sample {i+1}: Predicted={predicted_element}, Expected={expected_element}, Score={composite_score:.2f} {status}")
return outputs, scores, trajectories
def make_reflective_dataset(self, candidate: Dict[str, str], eval_batch: EvaluationBatch,
components_to_update: List[str]) -> Dict[str, List[Dict[str, Any]]]:
"""
Create reflective dataset using user-provided evaluator.
This method generates feedback based on the evaluation results
from the user's custom evaluator.
π₯ NEW: If hybrid mode is enabled, this method ALSO generates hybrid candidates
(GEPA Reflection + LLEGO Operators) and stores them for GEPA to use.
"""
# π₯ DEBUG: Log that this method is being called (CRITICAL for debugging)
self.logger.info(f"\n{'='*80}")
self.logger.info(f"π₯ make_reflective_dataset() CALLED BY GEPA")
self.logger.info(f"{'='*80}")
self.logger.info(f" Candidate prompt: {candidate.get('system_prompt', '')[:100]}...")
self.logger.info(f" Eval batch has trajectories: {eval_batch.trajectories is not None and len(eval_batch.trajectories) > 0 if eval_batch.trajectories else False}")
self.logger.info(f" Eval batch scores: {eval_batch.scores if eval_batch.scores else 'None'}")
self.logger.info(f" Components to update: {components_to_update}")
reflective_dataset = {}
system_prompt = candidate.get('system_prompt', '')
# π₯ REMOVED: Verbose diagnostic checks - only log if hybrid mode is actually enabled
hybrid_mode_enabled = (self._config and
hasattr(self._config, 'enable_gepa_reflection_with_llego') and
self._config.enable_gepa_reflection_with_llego and
self._reflection_lm_client)
if hybrid_mode_enabled:
self.logger.debug(f"β
Hybrid mode conditions met - will generate hybrid candidates")
# ========================================================================
# π₯ CRITICAL FIX: Update LLEGO population with evaluated candidate
# ========================================================================
# This is the MISSING LINK! After a candidate is evaluated, we need to add it
# to the LLEGO population so it can be used for crossover/mutation.
# Without this, the population only contains the seed, so Pareto front stays at 1!
#
# This is called for EVERY candidate that GEPA evaluates:
# - Seed prompt (baseline) β added to population
# - New candidate 1 (from reflection/crossover/mutation) β added to population
# - New candidate 2 β added to population
# - etc.
if self.llego:
# Calculate average fitness from evaluation scores
if eval_batch.scores and len(eval_batch.scores) > 0:
avg_fitness = sum(eval_batch.scores) / len(eval_batch.scores)
else:
# Fallback: extract from trajectories if scores not available
scores = [t.get('evaluation_results', {}).get('composite_score', 0.0)
for t in eval_batch.trajectories if 'evaluation_results' in t]
avg_fitness = sum(scores) / len(scores) if scores else 0.0
self.logger.debug(f"Updating LLEGO population: fitness={avg_fitness:.4f}")
# Create PromptCandidate from evaluated prompt
from ..operators.llego_operators import PromptCandidate
# Check if this candidate already exists in population (avoid duplicates)
# π₯ FIX: Normalize prompts for comparison (strip whitespace, remove quotes)
normalized_new_prompt = system_prompt.strip().strip('"\'')
existing_prompts = {p.prompt.strip().strip('"\'') for p in self.llego.population}
# Also check normalized versions
if normalized_new_prompt not in existing_prompts:
prompt_candidate = PromptCandidate(
prompt=system_prompt, # Keep original prompt (not normalized)
fitness=avg_fitness,
metadata={
'generation': self.llego.current_generation,
'operator': 'evaluated',
'prompt_length': len(system_prompt),
'word_count': len(system_prompt.split()),
'evaluation_samples': len(eval_batch.scores) if eval_batch.scores else 0,
'candidate_type': self._current_evaluation_type or 'unknown', # Store type for notation
'dataset_evaluated': self._current_dataset_type or 'unknown'
}
)
# Update population - this will add the candidate and keep top N by fitness
population_before = len(self.llego.population)
self.llego.update_population([prompt_candidate])
population_after = len(self.llego.population)
self.logger.debug(f"Added to LLEGO population: fitness={avg_fitness:.4f}, size={population_after}")
else:
# Update fitness if candidate already exists (seed prompt, etc.)
# π₯ FIX: Also normalize for comparison
updated = False
for p in self.llego.population:
normalized_existing = p.prompt.strip().strip('"\'')
if normalized_existing == normalized_new_prompt:
old_fitness = p.fitness
if avg_fitness > p.fitness:
p.fitness = avg_fitness
updated = True
self.logger.debug(f"Updated fitness: {old_fitness:.4f} β {avg_fitness:.4f}")
# Update candidate type if we have new information
if self._current_evaluation_type and p.metadata:
old_type = p.metadata.get('candidate_type', 'unknown')
if self._current_evaluation_type != old_type:
p.metadata['candidate_type'] = self._current_evaluation_type
else:
self.logger.debug(f"βΉοΈ Candidate already exists with better/equal fitness: {p.fitness:.4f} >= {avg_fitness:.4f}")
break
if not updated:
self.logger.debug(f"Candidate already in population with higher fitness")
else:
self.logger.debug("LLEGO not initialized - skipping population update")
# ========================================================================
# π₯ HYBRID MODE: Generate candidates at adapter level
# ========================================================================
if (self._config and
hasattr(self._config, 'enable_gepa_reflection_with_llego') and
self._config.enable_gepa_reflection_with_llego and
self._reflection_lm_client):
self.logger.debug("Generating hybrid candidates")
# Generate hybrid candidates FIRST
generated_candidates = self._generate_hybrid_candidates_adapter_level(
current_prompt=system_prompt,
eval_batch=eval_batch,
candidate=candidate
)
# π₯ CRITICAL: Store generated candidates so we can inject them
# _generate_hybrid_candidates_adapter_level now returns list of dicts with metadata
if generated_candidates:
candidate_dicts = []
for cand in generated_candidates:
if isinstance(cand, dict) and 'prompt' in cand:
# Already a dict with metadata (preferred format)
candidate_dicts.append(cand)
elif isinstance(cand, str):
# Just a string - determine source based on position (fallback)
# This shouldn't happen if _generate_hybrid_candidates_adapter_level is fixed
self.logger.warning(f"β οΈ Received string candidate instead of dict - using fallback logic")
if len(candidate_dicts) < self._config.num_gepa_reflection_candidates:
source = 'gepa_reflection'
elif len(candidate_dicts) < self._config.num_gepa_reflection_candidates + self._config.n_crossover:
source = 'llego_crossover'
else:
source = 'llego_mutation'
candidate_dicts.append({
'prompt': cand,
'source': source,
'index': len(candidate_dicts) + 1
})
else:
self.logger.warning(f"β οΈ Unknown candidate format: {type(cand)}")
self._generated_candidates = candidate_dicts
# Store candidate sources for tracking
for cand_dict in candidate_dicts:
if 'prompt' in cand_dict and 'source' in cand_dict:
self._candidate_sources[cand_dict['prompt']] = cand_dict['source']
# π₯ CRITICAL: Inject into LLM client wrapper so it can return them when GEPA calls
# This is the key mechanism: when GEPA calls adapter.llm_client.generate() for proposals,
# our wrapper will detect it and return our pre-generated candidates
if hasattr(self.llm_client, '_adapter_generated_candidates'):
self.llm_client._adapter_generated_candidates = candidate_dicts.copy()
self.logger.debug(f"Injected {len(candidate_dicts)} candidates")
else:
try:
self.llm_client._adapter_generated_candidates = candidate_dicts.copy()
except Exception as e:
self.logger.error(f"Failed to inject candidates: {e}")
# Evaluate generated candidates on Dpareto for fair comparison
if hasattr(self, '_evaluating_generated_candidates'):
pass # Skip to prevent recursion
elif self._valset and len(self._valset) > 0:
self._evaluating_generated_candidates = True
self.logger.debug(f"Evaluating {len(candidate_dicts)} candidates on Dpareto ({len(self._valset)} samples)")
# π₯ NEW: Collect all candidates with scores for batch update
candidates_with_scores = []
for i, cand_dict in enumerate(candidate_dicts, 1):
cand_prompt = cand_dict.get('prompt', '')
cand_source = cand_dict.get('source', 'unknown')
if not cand_prompt:
continue
# Normalize prompt for duplicate detection
normalized_prompt = cand_prompt.strip().strip('"\'')
# Check if already evaluated on Dpareto (avoid double evaluation)
if normalized_prompt in self._dpareto_evaluated_candidates:
existing_score, existing_type, _ = self._dpareto_evaluated_candidates[normalized_prompt]
# Still add to batch for Pareto update (with existing score)
notation_map = {
'seed': 'Sβ',
'gepa_reflection': 'Sα΅£',
'llego_crossover': 'Oββ',
'llego_mutation': 'Oβα΅€β'
}
cand_notation = notation_map.get(cand_source, 'S')
candidates_with_scores.append({
'prompt': cand_prompt,
'score': existing_score,
'type': cand_source,
'notation': cand_notation
})
continue
# Evaluate this candidate on valset (Dpareto)
try:
# Set candidate type for proper logging
self._current_evaluation_type = cand_source
# π₯ CRITICAL: Temporarily disable individual Pareto updates
# We'll do batch update after all evaluations
from ..utils.pareto_logger import get_pareto_logger
pareto_log = get_pareto_logger()
original_log_method = pareto_log.log_candidate_evaluation
# Temporarily replace to prevent individual updates
def noop_log(*args, **kwargs):
pass # Skip individual logging - we'll batch update later
pareto_log.log_candidate_evaluation = noop_log
# Evaluate on valset - THIS IS THE FAIR EVALUATION ON SAME DATASET
valset_eval = self.evaluate(
batch=self._valset, # Same valset as seed!
candidate={'system_prompt': cand_prompt, 'source': cand_source},
capture_traces=True
)
# Restore original method
pareto_log.log_candidate_evaluation = original_log_method
avg_score = sum(valset_eval.scores) / len(valset_eval.scores) if valset_eval.scores else 0.0
# Store evaluation result to avoid double evaluation
self._dpareto_evaluated_candidates[normalized_prompt] = (
avg_score,
cand_source,
'evaluated_in_make_reflective_dataset'
)
self.logger.debug(f"Candidate {i} evaluated: score={avg_score:.4f}")
# Generate notation
notation_map = {
'seed': 'Sβ',
'gepa_reflection': 'Sα΅£',
'llego_crossover': 'Oββ',
'llego_mutation': 'Oβα΅€β'
}
cand_notation = notation_map.get(cand_source, 'S')
# Add to batch for Pareto update
candidates_with_scores.append({
'prompt': cand_prompt,
'score': avg_score,
'type': cand_source,
'notation': cand_notation
})
# π₯ CRITICAL: Explicitly add this candidate to LLEGO population with Dpareto fitness
if self.llego:
from ..operators.llego_operators import PromptCandidate
# Check if already in population
existing_in_pop = False
for p in self.llego.population:
if p.prompt.strip().strip('"\'') == normalized_prompt:
# Update fitness if this Dpareto score is better
if avg_score > p.fitness:
old_fitness = p.fitness
p.fitness = avg_score
if p.metadata:
p.metadata['candidate_type'] = cand_source
p.metadata['dataset_evaluated'] = 'dpareto'
self.logger.debug(f"Updated LLEGO fitness: {old_fitness:.4f} β {avg_score:.4f}")
existing_in_pop = True
break
if not existing_in_pop:
# Add new candidate to population
prompt_candidate = PromptCandidate(
prompt=cand_prompt,
fitness=avg_score,
metadata={
'generation': self.llego.current_generation,
'operator': 'evaluated_on_dpareto',
'prompt_length': len(cand_prompt),
'word_count': len(cand_prompt.split()),
'evaluation_samples': len(valset_eval.scores) if valset_eval.scores else 0,
'candidate_type': cand_source,
'dataset_evaluated': 'dpareto'
}
)
self.llego.update_population([prompt_candidate])
except Exception as e:
self.logger.error(f" β Error evaluating candidate #{i} on Dpareto: {e}")
import traceback
self.logger.error(traceback.format_exc())
# Batch Pareto front update
if candidates_with_scores:
from ..utils.pareto_logger import get_pareto_logger
pareto_log = get_pareto_logger()
added_candidates = pareto_log.batch_update_pareto_front(candidates_with_scores)
# π₯ CRITICAL: Update queue with scores for best-candidate selection
# Create a mapping of prompt -> score for quick lookup
prompt_to_score = {c['prompt'].strip().strip('"\''): c['score'] for c in candidates_with_scores}
# Update candidates in queue with their scores
if hasattr(self.llm_client, '_adapter_generated_candidates'):
updated_queue = []
for cand in self.llm_client._adapter_generated_candidates:
if isinstance(cand, dict):
cand_prompt = cand.get('prompt', '')
normalized = cand_prompt.strip().strip('"\'')
if normalized in prompt_to_score:
# Update with score
cand['score'] = prompt_to_score[normalized]
updated_queue.append(cand)
else:
updated_queue.append(cand)
else:
updated_queue.append(cand)
self.llm_client._adapter_generated_candidates = updated_queue
self.logger.debug(f"Pareto update: {len(added_candidates)} added, front size={len(pareto_log.pareto_front)}")
# Clear flag after evaluation complete
self._evaluating_generated_candidates = False
elif not hasattr(self, '_evaluating_generated_candidates'):
self.logger.error("Valset not available - cannot evaluate generated candidates")
# Signal LLEGO-enhanced client for reflection mode
if self.llego and hasattr(self.llm_client, 'set_reflection_context'):
self.llm_client.set_reflection_context(
current_prompt=system_prompt,
feedback=eval_batch,
in_reflection=True
)
# π₯ CRITICAL: Also set reflection context on reflection_lm_client if it exists
# This ensures hybrid mode candidate generation is triggered when GEPA calls reflection_lm_callable
if hasattr(self, 'reflection_lm_client') and self.reflection_lm_client:
if hasattr(self.reflection_lm_client, 'set_reflection_context'):
self.logger.info("π₯ CRITICAL: Setting reflection context on reflection_lm_client for hybrid mode")
self.reflection_lm_client.set_reflection_context(
current_prompt=system_prompt,
feedback=eval_batch,
in_reflection=True # This enables hybrid candidate generation!
)
self._log_reflection_dataset_creation(candidate, eval_batch, components_to_update)
# Inject generated candidates into reflective dataset
suggested_prompts = []
if hasattr(self, '_generated_candidates') and self._generated_candidates:
suggested_prompts = [c['prompt'] for c in self._generated_candidates if isinstance(c, dict) and 'prompt' in c]
self.logger.debug(f"Injecting {len(suggested_prompts)} suggested prompts")
for component in components_to_update:
reflective_dataset[component] = []
for trace in eval_batch.trajectories:
# Generate feedback based on evaluation results
# π Phase 2: Pass trace and current_prompt for LLM-as-Judge
feedback = self._generate_feedback(
trace['evaluation_results'],
trace=trace,
current_prompt=system_prompt
)
# Base reflection data
# π₯ FIX: Strip image_base64 from input_data to prevent massive base64 strings in logs
input_data_clean = trace['input_data'].copy() if isinstance(trace['input_data'], dict) else {}
if 'image_base64' in input_data_clean:
input_data_clean['image_base64'] = f"[IMAGE_DATA_{len(input_data_clean['image_base64'])}_chars]"
# π₯ FIX: Clean detailed_scores to remove any base64 references or large data
detailed_scores_clean = {}
if isinstance(trace['evaluation_results'], dict):
for key, value in trace['evaluation_results'].items():
# Skip any values that look like base64 (very long strings)
if isinstance(value, str) and len(value) > 1000:
detailed_scores_clean[key] = f"[DATA_{len(value)}_chars]"
else:
detailed_scores_clean[key] = value
else:
detailed_scores_clean = trace['evaluation_results']
reflection_entry = {
"current_prompt": system_prompt,
"input_data": input_data_clean, # Use cleaned version without full base64
"predicted_output": trace['predicted_output'],
"score": trace['evaluation_results'].get("composite_score", 0.0),
"feedback": feedback,
"detailed_scores": detailed_scores_clean # Cleaned scores without large data
}
# π₯ CRITICAL: Only optimize system_prompt, NOT user_prompt
# The user_prompt contains the task description (command) and should NOT be modified
if component == 'system_prompt' and suggested_prompts:
# Add suggested improved prompts to the reflection entry
# GEPA might use these if the structure supports it
reflection_entry["suggested_improved_prompts"] = suggested_prompts
reflection_entry["num_suggestions"] = len(suggested_prompts)
# Also add the best suggested prompt as a direct suggestion
if suggested_prompts:
reflection_entry["suggested_prompt"] = suggested_prompts[0] # First candidate as primary suggestion
reflection_entry["optimize_component"] = "system_prompt_only" # Mark that we only optimize system_prompt
elif component != 'system_prompt':
# For non-system_prompt components (like user_prompt), do NOT add suggestions
# We only want to optimize system_prompt
reflection_entry["optimize_component"] = "skip" # Mark to skip optimization
self.logger.info(f"β οΈ Skipping optimization for component '{component}' - only optimizing system_prompt")
reflective_dataset[component].append(reflection_entry)
total_samples = sum(len(data) for data in reflective_dataset.values())
avg_score = sum(trace['score'] for data in reflective_dataset.values() for trace in data) / total_samples if total_samples > 0 else 0.0
self.logger.info(f"π Reflection dataset created - {total_samples} samples, avg score: {avg_score:.4f}")
return reflective_dataset
def _generate_feedback(
self,
evaluation_results: Dict[str, Any],
trace: Optional[Dict[str, Any]] = None,
current_prompt: Optional[str] = None
) -> str:
"""
Generate feedback using hybrid approach:
- LLM-as-Judge for low/medium scores (detailed, actionable)
- Simple feedback for high scores (efficient)
Args:
evaluation_results: Evaluation scores and extracted data
trace: Full trace with input_data, predicted_output, etc. (optional)
current_prompt: The current system prompt being optimized (optional)
Returns:
Feedback string focused on prompt improvement
"""
composite_score = evaluation_results.get("composite_score", 0.0)
# Check if LLM-as-Judge is enabled
use_llm_judge = getattr(self._config, 'use_llm_as_judge', True)
threshold = getattr(self._config, 'llm_as_judge_threshold', 0.8)
# π₯ FIX: Check both attribute names (inconsistency in codebase)
reflection_lm = getattr(self, '_reflection_lm_client', None) or getattr(self, 'reflection_lm_client', None)
# Debug logging - use INFO so we can see what's happening
self.logger.info(f"π Feedback generation: score={composite_score:.4f}, use_llm_judge={use_llm_judge}, threshold={threshold}, has_trace={trace is not None}, has_reflection_lm={reflection_lm is not None}")
if trace:
input_data = trace.get('input_data', {})
predicted = trace.get('predicted_output', '')[:100] if trace.get('predicted_output') else 'N/A'
expected = input_data.get('output', '')[:100] if input_data.get('output') else 'N/A'
self.logger.info(f" Predicted preview: {predicted}...")
self.logger.info(f" Expected preview: {expected}...")
# Use LLM-as-Judge for scores needing improvement
if use_llm_judge and composite_score < threshold and trace:
if not reflection_lm:
self.logger.warning("β οΈ LLM-as-Judge requested but reflection_lm_client not available - using simple feedback")
self.logger.warning(f" Checked: _reflection_lm_client={getattr(self, '_reflection_lm_client', None) is not None}, reflection_lm_client={getattr(self, 'reflection_lm_client', None) is not None}")
else:
try:
self.logger.info(f"π€ Calling LLM-as-Judge for detailed feedback (score: {composite_score:.4f} < threshold: {threshold})")
feedback = self._llm_as_judge_feedback(
evaluation_results,
trace,
current_prompt
)
self.logger.info(f"β
LLM-as-Judge returned feedback (length: {len(feedback)} chars)")
return feedback
except Exception as e:
self.logger.error(f"β LLM-as-Judge failed: {e}, falling back to simple feedback")
import traceback
self.logger.error(traceback.format_exc())
# Fall through to simple feedback
# Simple actionable feedback (for high scores or as fallback)
if composite_score >= threshold:
self.logger.debug(f"β
Score {composite_score:.4f} >= threshold {threshold} - using simple feedback")
elif not trace:
self.logger.debug(f"β οΈ No trace provided - using simple feedback")
elif not use_llm_judge:
self.logger.debug(f"β οΈ LLM-as-Judge disabled - using simple feedback")
feedback = self._simple_actionable_feedback(
evaluation_results,
trace,
current_prompt
)
# π₯ ADD FORMAT FEEDBACK: Append format-specific feedback if available
if self._detected_format and trace:
from ..utils.format_detection import generate_format_feedback
input_data = trace.get('input_data', {})
format_feedback = generate_format_feedback(
predicted_output=trace.get('predicted_output', ''),
expected_output=input_data.get('output', ''),
format_info=self._detected_format
)
if format_feedback:
feedback += format_feedback
return feedback
def _llm_as_judge_feedback(
self,
evaluation_results: Dict[str, Any],
trace: Dict[str, Any],
current_prompt: Optional[str] = None
) -> str:
"""
Generate detailed, actionable feedback using LLM-as-Judge.
π₯ UNIVERSAL VERSION: Works for ANY task type (text, JSON, structured outputs).
No UI-specific assumptions. Pure semantic and structural comparison.
Args:
evaluation_results: Evaluation scores and extracted data
trace: Full trace with input_data, predicted_output, etc.
current_prompt: The current system prompt being optimized
Returns:
Detailed feedback string focused on prompt improvement
"""
# Import universal judge prompt builder
from ..utils.universal_judge_prompt import (
build_universal_judge_prompt,
get_universal_judge_system_prompt,
format_universal_judge_feedback,
build_empty_output_feedback
)
# Extract data from trace
input_data = trace.get('input_data', {})
predicted_output = trace.get('predicted_output', '') or ''
expected_output = input_data.get('output', '') or ''
task_input = input_data.get('input', '') or ''
# Get image if available (for multi-modal tasks)
image_base64 = input_data.get('image', '') or input_data.get('image_base64', '')
# Log what we're working with
self.logger.info(f"π LLM-as-Judge input check:")
self.logger.info(f" predicted_output length: {len(predicted_output)} chars")
self.logger.info(f" expected_output length: {len(expected_output)} chars")
self.logger.info(f" image available: {bool(image_base64)} (length: {len(image_base64) if image_base64 else 0} chars)")
self.logger.info(f" predicted_output preview: {predicted_output[:200] if predicted_output else '[EMPTY]'}...")
self.logger.info(f" expected_output preview: {expected_output[:200] if expected_output else '[EMPTY]'}...")
# Handle empty predicted output specially
if not predicted_output or not predicted_output.strip():
self.logger.warning(f"β οΈ Predicted output is empty - generating specialized feedback")
return build_empty_output_feedback(task_input, expected_output, current_prompt)
if not image_base64:
self.logger.debug(f"βΉοΈ No image provided - text-only analysis")
# Get the LLM for judging
judge_llm = getattr(self, '_reflection_lm_client', None) or getattr(self, 'reflection_lm_client', None)
if not judge_llm:
self.logger.error("β CRITICAL: No reflection_lm_client available for LLM-as-Judge!")
raise ValueError("reflection_lm_client not available")
# Build the universal judge prompt
judge_prompt = build_universal_judge_prompt(
task_input=task_input,
predicted_output=predicted_output,
expected_output=expected_output,
current_prompt=current_prompt,
evaluation_results=evaluation_results,
image_base64=image_base64
)
# Get the universal system prompt
system_prompt = get_universal_judge_system_prompt(has_image=bool(image_base64))
# Call LLM-as-Judge
try:
self.logger.info(f"π€ Calling Universal LLM-as-Judge for semantic analysis")
result = judge_llm.generate(
system_prompt=system_prompt,
user_prompt=judge_prompt,
image_base64=image_base64 if image_base64 else ""
)
if isinstance(result, dict):
judge_output = result.get('content', '')
else:
judge_output = str(result)
# Format the feedback using the universal formatter
score = evaluation_results.get('composite_score', 0.0)
feedback = format_universal_judge_feedback(
judge_output=judge_output,
task_input=task_input,
predicted_output=predicted_output,
expected_output=expected_output,
score=score
)
# π₯ ADD FORMAT FEEDBACK: Append format-specific feedback
if self._detected_format:
from ..utils.format_detection import generate_format_feedback
format_feedback = generate_format_feedback(
predicted_output=predicted_output,
expected_output=expected_output,
format_info=self._detected_format
)
if format_feedback:
feedback += format_feedback
# Also add format constraint for next iteration
feedback += f"\n\n{self._detected_format['format_constraint']}"
self.logger.info(f"β
Universal LLM-as-Judge generated feedback")
return feedback
except Exception as e:
self.logger.error(f"LLM-as-Judge failed: {e}")
import traceback
self.logger.error(traceback.format_exc())
# Fallback to simple feedback
return self._simple_actionable_feedback(evaluation_results, trace, current_prompt)
def _extract_reasoning_from_expected(self, expected_output: str) -> str:
"""Extract reasoning section from expected output."""
if not expected_output:
return ""
# Look for "Reason:" or "Reasoning:" section
reason_patterns = [
r'Reason[:\s]+(.*?)(?:\n\n|\Z)',
r'Reasoning[:\s]+(.*?)(?:\n\n|\Z)',
]
for pattern in reason_patterns:
match = re.search(pattern, expected_output, re.IGNORECASE | re.DOTALL)
if match:
return match.group(1).strip()[:500] # Truncate to 500 chars
return ""
def _extract_reasoning_from_predicted(self, predicted_output: str) -> str:
"""Extract reasoning from predicted output if available."""
# Similar to _extract_reasoning_from_expected
# Or return first 200 chars if no clear reasoning section
if not predicted_output:
return ""
# Look for reasoning patterns
reason_patterns = [
r'Reason[:\s]+(.*?)(?:\n\n|\Z)',
r'Reasoning[:\s]+(.*?)(?:\n\n|\Z)',
]
for pattern in reason_patterns:
match = re.search(pattern, predicted_output, re.IGNORECASE | re.DOTALL)
if match:
return match.group(1).strip()[:500]
# If no reasoning found, return first 200 chars
if len(predicted_output) > 200:
return predicted_output[:200] + "..."
return predicted_output
def _simple_actionable_feedback(
self,
evaluation_results: Dict[str, Any],
trace: Dict[str, Any] = None,
current_prompt: Optional[str] = None
) -> str:
"""
Simple feedback without LLM-as-Judge.
π₯ UNIVERSAL VERSION: Works for any task type.
"""
composite_score = evaluation_results.get("composite_score", 0.0)
semantic_sim = evaluation_results.get("semantic_similarity", 0.0)
structural_sim = evaluation_results.get("structural_similarity", 0.0)
feedback_parts = []
# Extract task context if available
if trace:
input_data = trace.get('input_data', {})
predicted = trace.get('predicted_output', '')
expected = input_data.get('output', '')
# Check for empty output
if not predicted or not predicted.strip():
feedback_parts.append(
"β CRITICAL: No output generated. "
"Add explicit output instructions to the prompt."
)
# Check for format mismatch
elif structural_sim < 0.5:
feedback_parts.append(
f"β οΈ Format mismatch (structural similarity: {structural_sim:.0%}). "
"Add output format instructions (e.g., 'Return as JSON with fields: ...')."
)
# Check for semantic mismatch
elif semantic_sim < 0.5:
feedback_parts.append(
f"β οΈ Semantic mismatch (similarity: {semantic_sim:.0%}). "
"The output meaning differs from expected. Add clearer task instructions."
)
# Score-based feedback
if composite_score >= 0.9:
feedback_parts.append("β
Excellent match - prompt is working well.")
elif composite_score >= 0.8:
feedback_parts.append("β
Good match - minor refinements possible.")
elif composite_score >= 0.6:
feedback_parts.append(
f"β οΈ Partial match (score: {composite_score:.0%}). "
"Consider adding examples or more specific field names to the prompt."
)
elif composite_score >= 0.3:
feedback_parts.append(
f"β οΈ Low match (score: {composite_score:.0%}). "
"The prompt needs clearer instructions about expected output format and content."
)
else:
feedback_parts.append(
f"β Poor match (score: {composite_score:.0%}). "
"Major revision required - add explicit output format, field names, and examples."
)
return "\n".join(feedback_parts) if feedback_parts else f"Score: {composite_score:.0%}"
def get_best_candidate(self) -> Optional[Dict[str, str]]:
"""
Get the best candidate from GEPA Pareto front.
GEPA Pareto front is the single source of truth because:
- All candidates (GEPA reflection, LLEGO crossover, LLEGO mutation) are evaluated on Dpareto
- All non-dominated candidates are added to GEPA Pareto front
- Therefore, the best candidate MUST be in GEPA Pareto front
Returns:
Best candidate dictionary from GEPA Pareto front, or None if empty
"""
# PRIMARY: Get best candidate from GEPA Pareto front (single source of truth)
from ..utils.pareto_logger import get_pareto_logger
pareto_log = get_pareto_logger()
if pareto_log.pareto_front:
try:
# Get best candidate from GEPA Pareto front (highest score = best)
gepa_best = max(pareto_log.pareto_front, key=lambda x: x['score'])
gepa_fitness = gepa_best['score']
gepa_prompt = gepa_best['prompt']
gepa_type = gepa_best.get('type', 'unknown')
gepa_notation = gepa_best.get('notation', 'S')
self.logger.info(f"β
Best candidate from GEPA Pareto front: {gepa_notation} with f({gepa_notation})={gepa_fitness:.4f}")
self.logger.info(f" Type: {gepa_type}, Prompt length: {len(gepa_prompt)} chars")
self.logger.info(f" π‘ GEPA Pareto front is single source of truth (all candidates evaluated on Dpareto)")
return {
'system_prompt': gepa_prompt,
'fitness': gepa_fitness,
'source': 'gepa_pareto_front',
'candidate_type': gepa_type,
'notation': gepa_notation
}
except Exception as e:
self.logger.error(f"β Failed to get best from GEPA Pareto front: {e}")
import traceback
self.logger.error(traceback.format_exc())
# EDGE CASE: Pareto front empty (shouldn't happen, but handle gracefully)
self.logger.warning("β οΈ GEPA Pareto front is empty - no best candidate available")
self.logger.warning(" This should not happen if all candidates are evaluated on Dpareto")
return None
def get_best_score(self) -> float:
"""Get the best score from GEPA Pareto front (single source of truth)."""
from ..utils.pareto_logger import get_pareto_logger
pareto_log = get_pareto_logger()
if pareto_log.pareto_front:
try:
gepa_best_fitness = max(p['score'] for p in pareto_log.pareto_front)
return gepa_best_fitness
except Exception as e:
self.logger.warning(f"β οΈ Failed to get best fitness from GEPA Pareto front: {e}")
# Edge case: Pareto front empty - fallback to adapter's score
return self._best_score
def log_proposed_candidate(self, candidate: Dict[str, str], iteration: int = 0):
"""
Pretty print the new proposed candidate prompt.
Args:
candidate: The new candidate prompt from GEPA
iteration: Current optimization iteration
"""
system_prompt = candidate.get('system_prompt', '')
candidate_source = candidate.get('source', 'unknown')
# Store source in adapter state so evaluate() can access it
self._current_evaluation_type = candidate_source
# Also store in mapping by prompt text for lookup
if candidate_source != 'unknown' and system_prompt:
self._candidate_sources[system_prompt] = candidate_source
# Use clean logger for simpler output
from ..utils.clean_logger import get_clean_logger
clean_log = get_clean_logger()
# Update iteration if needed
if iteration > clean_log.current_iteration:
clean_log.log_iteration_start(iteration, seed_prompt=None)
# Don't log here - let evaluate() handle it with full context
def _log_reflection_dataset_creation(self, candidate: Dict[str, str], eval_batch: EvaluationBatch,
components_to_update: List[str]):
"""
Pretty print the reflection dataset creation process.
Args:
candidate: Current candidate being evaluated
eval_batch: Evaluation results
components_to_update: Components being updated
"""
system_prompt = candidate.get('system_prompt', '')
self.logger.info(f"π DEBUG: Inside _log_reflection_dataset_creation")
self.logger.info(f"π DEBUG: system_prompt length: {len(system_prompt)}")
self.logger.info(f"π DEBUG: eval_batch.scores: {eval_batch.scores}")
self.logger.info(f"π DEBUG: eval_batch.trajectories: {len(eval_batch.trajectories) if eval_batch.trajectories else 0}")
# Determine candidate notation
notation_map = {'seed': 'Sβ', 'gepa_reflection': 'Sα΅£', 'llego_crossover': 'Oββ', 'llego_mutation': 'Oβα΅€β'}
notation = notation_map.get(self._current_evaluation_type, 'S')
cand_num = self._evaluation_count if hasattr(self, '_evaluation_count') else '?'
cand_label = f"{notation}{cand_num}"
# Use logger for the main output too
self.logger.info("\n" + "="*80)
self.logger.info("π REFLECTION DATASET CREATION")
self.logger.info("="*80)
self.logger.info(f"\nπ CURRENT PROMPT BEING ANALYZED: {cand_label}")
self.logger.info(f" Candidate Type: {self._current_evaluation_type or 'unknown'}")
self.logger.info("-" * 40)
self.logger.info(f'"{system_prompt}"')
self.logger.info("-" * 40)
self.logger.info(f"\nπ EVALUATION SUMMARY:")
self.logger.info("-" * 40)
if eval_batch.scores:
avg_score = sum(eval_batch.scores) / len(eval_batch.scores)
min_score = min(eval_batch.scores)
max_score = max(eval_batch.scores)
self.logger.info(f" β’ Average Score: {avg_score:.4f}")
self.logger.info(f" β’ Min Score: {min_score:.4f}")
self.logger.info(f" β’ Max Score: {max_score:.4f}")
self.logger.info(f" β’ Total Samples: {len(eval_batch.scores)}")
self.logger.info(f"\nπ― COMPONENTS TO UPDATE:")
self.logger.info("-" * 40)
for i, component in enumerate(components_to_update, 1):
self.logger.info(f" {i}. {component}")
if eval_batch.trajectories:
self.logger.info(f"\nπ DETAILED ANALYSIS (FULL FEEDBACK - NO TRUNCATION):")
self.logger.info("-" * 80)
for i, trace in enumerate(eval_batch.trajectories[:5], 1): # Show first 5 samples with FULL details
evaluation_results = trace['evaluation_results']
composite_score = evaluation_results.get("composite_score", 0.0)
# Extract element IDs for concise logging
predicted_element = evaluation_results.get('predicted_element', 'Unknown')
expected_element = evaluation_results.get('expected_element', 'Unknown')
# Concise, direct logging with candidate notation
status_icon = "β
" if composite_score == 1.0 else "β"
# Add notation for candidate type
notation_map = {'seed': 'Sβ', 'gepa_reflection': 'Sα΅£', 'llego_crossover': 'Oββ', 'llego_mutation': 'Oβα΅€β'}
notation = notation_map.get(self._current_evaluation_type, 'S')
self.logger.info(f" [{notation}] Sample {i}: Predicted={predicted_element}, Expected={expected_element}, Score={composite_score:.2f} {status_icon}")
# π₯ FIX: Pass trace and current_prompt to enable LLM-as-Judge!
feedback = self._generate_feedback(
evaluation_results,
trace=trace, # Pass the full trace!
current_prompt=system_prompt # Pass current prompt being analyzed!
)
self.logger.info(f" π¬ FEEDBACK (FULL):")
self.logger.info(f" \"{feedback}\"")
if len(eval_batch.trajectories) > 5:
self.logger.info(f"\n ... and {len(eval_batch.trajectories) - 5} more samples (all logged similarly)")
self.logger.info("="*80)
def _extract_clean_prompt_from_reflection(self, reflection_output: str) -> str:
"""
π‘οΈ DEFENSIVE FALLBACK: Extract clean prompt if LLM adds analysis despite system prompt instructions.
NOTE: The system prompt now explicitly instructs the LLM to output ONLY the prompt text.
However, this extraction logic serves as a safety net in case the LLM still adds:
"Based on the performance analysis...
### Recommendations...
### Revised Prompt Example:
[THE ACTUAL PROMPT HERE]
### Conclusion..."
This is now a defensive measure, not the primary mechanism.
Args:
reflection_output: Full reflection output (should be clean prompt, but may contain analysis)
Returns:
str: Clean, extracted prompt (or original if extraction fails or not needed)
"""
if not reflection_output or not isinstance(reflection_output, str):
return reflection_output
# Pattern 1: Look for "Revised Prompt Example:" or "### Revised Prompt Example:"
patterns = [
r'(?:###\s*)?Revised\s+Prompt\s+(?:Example|:)?\s*\n(.*?)(?:\n###|\n##|\n---|\Z)',
r'(?:###\s*)?Revised\s+Prompt\s*:\s*\n(.*?)(?:\n###|\n##|\n---|\Z)',
r'(?:###\s*)?Optimized\s+Prompt\s*:\s*\n(.*?)(?:\n###|\n##|\n---|\Z)',
r'(?:###\s*)?New\s+Prompt\s*:\s*\n(.*?)(?:\n###|\n##|\n---|\Z)',
r'(?:Here\s+is|Here\'s)\s+a?\s*refined?\s+(?:version\s+of\s+)?(?:the\s+)?prompt\s*[:\n](.*?)(?:\n###|\n##|\n---|\Z)',
]
for pattern in patterns:
match = re.search(pattern, reflection_output, re.IGNORECASE | re.DOTALL)
if match:
extracted = match.group(1).strip()
# Clean up common artifacts
extracted = re.sub(r'^```(?:plaintext|markdown|text)?\s*\n', '', extracted, flags=re.MULTILINE)
extracted = re.sub(r'\n```\s*$', '', extracted, flags=re.MULTILINE)
extracted = extracted.strip()
if len(extracted) > 50: # Reasonable minimum length for a prompt
self.logger.debug(f"β
Extracted clean prompt using pattern: {pattern[:50]}...")
self.logger.debug(f" Original length: {len(reflection_output)} chars")
self.logger.debug(f" Extracted length: {len(extracted)} chars")
return extracted
# Pattern 2: If output starts with a quote or prompt-like structure
# Look for text that starts with "You are..." and is substantial
if 'You are' in reflection_output:
# Find the longest continuous block that starts with "You are"
prompt_match = re.search(r'(You are[^#]*?)(?:\n###|\n##|###|##|Conclusion|\Z)',
reflection_output, re.IGNORECASE | re.DOTALL)
if prompt_match:
extracted = prompt_match.group(1).strip()
if len(extracted) > 50:
self.logger.debug(f"β
Extracted prompt starting with 'You are...'")
return extracted
# Pattern 3: If the reflection output is actually just a clean prompt (no analysis)
# Check if it's relatively short and doesn't contain analysis keywords
analysis_keywords = ['recommendation', 'suggestion', 'improvement', 'conclusion',
'optimization', 'analysis', 'feedback']
if (len(reflection_output) < 2000 and
not any(keyword in reflection_output.lower() for keyword in analysis_keywords)):
# Likely a clean prompt, return as-is
self.logger.debug(f"β
Reflection output appears to be a clean prompt (no analysis detected)")
return reflection_output.strip()
# Fallback: Return original (with warning)
self.logger.warning(f"β οΈ Could not extract clean prompt from reflection output")
self.logger.warning(f" Output length: {len(reflection_output)} chars")
self.logger.warning(f" Output preview: {reflection_output[:200]}...")
self.logger.warning(f" Returning original output (may contain analysis text)")
return reflection_output.strip()
def _parse_json_variations(self, response_text: str, num_expected: int) -> List[str]:
"""
π₯ OPTIMIZED: Parse N prompt variations from JSON format response.
Uses robust JSON parsing with multiple fallback strategies:
1. Extract JSON from markdown code blocks (```json ... ```)
2. Find JSON object directly in text
3. Attempt JSON repair for common issues
4. Fallback to numbered section parsing if JSON fails
Args:
response_text: LLM response containing JSON with variations
num_expected: Expected number of variations
Returns:
List[str]: List of prompt variations (in order by index)
Raises:
ValueError: If parsing fails and no valid variations found
"""
import json
import re
if not response_text or not isinstance(response_text, str):
raise ValueError("Empty or invalid response text")
# Strategy 1: Extract JSON from markdown code block
json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', response_text, re.DOTALL)
if json_match:
json_str = json_match.group(1)
try:
data = json.loads(json_str)
return self._extract_variations_from_json(data, num_expected)
except json.JSONDecodeError as e:
self.logger.debug(f"JSON in code block invalid: {e}, trying repair...")
# Strategy 2: Find JSON object directly in text
json_match = re.search(r'\{[^{}]*"variations"[^{}]*\[.*?\]\s*[^{}]*\}', response_text, re.DOTALL)
if json_match:
json_str = json_match.group(0)
try:
data = json.loads(json_str)
return self._extract_variations_from_json(data, num_expected)
except json.JSONDecodeError:
# Try to find largest JSON object
json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
if json_match:
try:
data = json.loads(json_match.group(0))
return self._extract_variations_from_json(data, num_expected)
except json.JSONDecodeError:
pass
# Strategy 3: Attempt JSON repair (common issues: trailing commas, unescaped quotes)
json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
if json_match:
json_str = json_match.group(0)
# Try common repairs
repaired = re.sub(r',\s*}', '}', json_str) # Remove trailing commas before }
repaired = re.sub(r',\s*]', ']', repaired) # Remove trailing commas before ]
try:
data = json.loads(repaired)
return self._extract_variations_from_json(data, num_expected)
except json.JSONDecodeError:
pass
# Strategy 4: Fallback to numbered section parsing
self.logger.warning(f"JSON parsing failed, trying numbered section fallback...")
try:
return self._parse_numbered_section_variations(response_text, num_expected)
except ValueError:
pass
# All strategies failed
raise ValueError(f"Could not parse {num_expected} variations from response. Response preview: {response_text[:300]}...")
def _extract_variations_from_json(self, data: Dict[str, Any], num_expected: int) -> List[str]:
"""Extract and validate variations from parsed JSON data."""
if not isinstance(data, dict):
raise ValueError("JSON data is not a dictionary")
variations_list = data.get('variations', [])
if not isinstance(variations_list, list):
raise ValueError("'variations' field is not a list")
if len(variations_list) < num_expected:
self.logger.warning(f"Expected {num_expected} variations, found {len(variations_list)} in JSON")
# Extract and sort by index
variations_with_index = []
for var in variations_list:
if not isinstance(var, dict):
continue
index = var.get('index', 0)
prompt = var.get('prompt', '')
if prompt and isinstance(prompt, str):
variations_with_index.append((index, prompt.strip()))
# Sort by index
variations_with_index.sort(key=lambda x: x[0])
# Extract just the prompts
variations = [v[1] for v in variations_with_index]
# Validate count
if len(variations) < num_expected:
self.logger.warning(f"Only {len(variations)} valid variations found, expected {num_expected}")
# Pad with duplicates if needed (not ideal but better than failing)
while len(variations) < num_expected:
variations.append(variations[-1] if variations else "")
# Take first N if we got more
variations = variations[:num_expected]
# Validate all variations are non-empty
if not all(v for v in variations):
raise ValueError(f"Some variations are empty after parsing")
return variations
def _parse_numbered_section_variations(self, response_text: str, num_expected: int) -> List[str]:
"""
Fallback parser: Extract variations from numbered sections.
Format: --- VARIATION N --- or Variation N: or similar
"""
variations = []
# Pattern 1: --- VARIATION N ---
pattern1 = r'---\s*VARIATION\s+(\d+)\s*---\s*\n(.*?)(?=\n---\s*VARIATION|\Z)'
matches1 = re.findall(pattern1, response_text, re.DOTALL | re.IGNORECASE)
# Pattern 2: Variation N:
pattern2 = r'Variation\s+(\d+)\s*:?\s*\n(.*?)(?=\nVariation\s+\d+|$)'
matches2 = re.findall(pattern2, response_text, re.DOTALL | re.IGNORECASE)
# Pattern 3: Numbered list (1. 2. 3.)
pattern3 = r'(\d+)\.\s*\n(.*?)(?=\n\d+\.|$)'
matches3 = re.findall(pattern3, response_text, re.DOTALL)
# Use the pattern with most matches
matches = matches1 if len(matches1) >= num_expected else (matches2 if len(matches2) >= num_expected else matches3)
if len(matches) >= num_expected:
# Sort by index
matches.sort(key=lambda x: int(x[0]))
# Extract prompts
variations = [match[1].strip() for match in matches[:num_expected]]
if len(variations) != num_expected:
raise ValueError(f"Numbered section parsing found {len(variations)} variations, expected {num_expected}")
return variations
def _generate_hybrid_candidates_adapter_level(
self,
current_prompt: str,
eval_batch: EvaluationBatch,
candidate: Dict[str, str]
) -> List[str]:
"""
π₯ ADAPTER-LEVEL HYBRID CANDIDATE GENERATION
Generate candidates from BOTH GEPA reflection AND LLEGO operators
when GEPA's adapter mode ignores the reflection_lm parameter.
This method:
1. Builds comprehensive feedback from evaluation results
2. Generates GEPA reflection candidates
3. Generates LLEGO crossover/mutation candidates
4. Logs ALL candidates with FULL prompts (no truncation)
5. Stores candidates for potential use
Args:
current_prompt: The current prompt being optimized
eval_batch: Evaluation results with trajectories
candidate: Current candidate dict
Returns:
List of generated candidate prompts
"""
try:
from ..llms.llego_enhanced_llm import LLEGOEnhancedLLMClient
all_candidates = []
gepa_count = 0
# π₯ CRITICAL: Pass format info to LLM client before generating candidates
if self._detected_format and self._reflection_lm_client:
if isinstance(self._reflection_lm_client, LLEGOEnhancedLLMClient):
self._reflection_lm_client._detected_format = self._detected_format
self.logger.info(f"π Passed format info to reflection LLM: {self._detected_format['format_type']}")
self.logger.info(f"π₯ STEP 1: Building comprehensive feedback from evaluation")
# π₯ REMOVED: Excessive diagnostic logs - moved to DEBUG level
# Build comprehensive feedback text from trajectories
if not hasattr(eval_batch, 'trajectories'):
self.logger.error(f"β eval_batch has no 'trajectories' attribute! Type: {type(eval_batch)}")
return []
trajectories = eval_batch.trajectories
if not trajectories:
self.logger.warning(f"β οΈ eval_batch.trajectories is empty - no feedback to generate candidates from")
return []
self.logger.debug(f"Processing {len(trajectories)} trajectories for feedback generation")
feedback_lines = []
feedback_lines.append(f"Current prompt performance analysis:\n")
feedback_lines.append(f"Current prompt:\n{current_prompt}\n")
feedback_lines.append(f"\nEvaluation results:\n")
for i, trace in enumerate(trajectories[:8], 1): # Use up to 8 samples for feedback
try:
eval_results = trace.get('evaluation_results', {})
score = eval_results.get("composite_score", 0.0) if isinstance(eval_results, dict) else 0.0
input_data = trace.get('input_data', {})
predicted = trace.get('predicted_output', '')
expected = input_data.get('output', '') if isinstance(input_data, dict) else ''
# π₯ FIX: Clean input_data to remove base64 images before logging
input_data_clean = input_data.copy() if isinstance(input_data, dict) else {}
if 'image_base64' in input_data_clean:
input_data_clean['image_base64'] = f"[IMAGE_DATA_{len(input_data_clean['image_base64'])}_chars]"
feedback_lines.append(f" Sample {i}:")
feedback_lines.append(f" Input: {input_data_clean.get('input', '') if isinstance(input_data_clean, dict) else ''}")
feedback_lines.append(f" Expected: {expected}")
feedback_lines.append(f" Predicted: {predicted}")
feedback_lines.append(f" Score: {score:.4f}")
if isinstance(eval_results, dict):
# π₯ FIX: Pass trace and current_prompt to enable LLM-as-Judge!
feedback = self._generate_feedback(
eval_results,
trace=trace, # Pass the full trace!
current_prompt=current_prompt # Pass current prompt!
)
feedback_lines.append(f" Feedback: {feedback}")
else:
feedback_lines.append(f" Feedback: Evaluation results not in expected format")
feedback_lines.append("")
except Exception as e:
self.logger.error(f"β Error processing trace {i}: {e}")
import traceback
self.logger.error(traceback.format_exc())
continue
feedback_text = "\n".join(feedback_lines)
self.logger.info(f"\nπ FULL FEEDBACK TEXT (NO TRUNCATION):")
self.logger.info(feedback_text)
# βββββββββββββββββββββββββββββββββββββββββββββββββββββ
# PART 1: GEPA REFLECTION CANDIDATES
# βββββββββββββββββββββββββββββββββββββββββββββββββββββ
self.logger.info(f"π PART 2: GEPA REFLECTION - Semantic Understanding")
num_gepa = self._config.num_gepa_reflection_candidates if hasattr(self._config, 'num_gepa_reflection_candidates') else 3
self.logger.info(f"\nπ Generating {num_gepa} GEPA Reflection candidates in single optimized call...")
# Set reflection context
if isinstance(self._reflection_lm_client, LLEGOEnhancedLLMClient):
self._reflection_lm_client.set_reflection_context(
current_prompt=current_prompt,
feedback=eval_batch,
in_reflection=True
)
# π₯ OPTIMIZED: Single call with JSON format for multiple variations
try:
# Precision-engineered system prompt requesting JSON format
optimization_system_prompt = f"""You are an expert prompt engineer specializing in iterative prompt optimization.
Your task: Given the CURRENT PROMPT and its EVALUATION FEEDBACK, generate {num_gepa} DISTINCT variations of improved prompts that address the identified issues through DIFFERENT improvement strategies.
CRITICAL OUTPUT FORMAT - MUST BE VALID JSON:
{{
"variations": [
{{
"index": 1,
"prompt": "[First improved prompt text - complete and self-contained]"
}},
{{
"index": 2,
"prompt": "[Second improved prompt text - complete and self-contained]"
}},
{{
"index": 3,
"prompt": "[Third improved prompt text - complete and self-contained]"
}}
]
}}
DIVERSITY REQUIREMENTS:
- Variation 1: Focus on clarity, specificity, and explicit instructions
- Variation 2: Focus on edge case handling, robustness, and error prevention
- Variation 3: Focus on structural organization, examples, and step-by-step guidance
- Each variation must be MEANINGFULLY DIFFERENT (not just rewordings)
- Each variation must address ALL feedback issues but through different approaches
QUALITY STANDARDS (apply to all variations):
- Be specific and concrete (avoid vague instructions)
- Use clear, imperative language for task instructions
- Include edge case handling if feedback identifies confusion
- Ensure each prompt is self-contained and unambiguous
- Preserve the core task domain and output format requirements
OUTPUT FORMAT:
- Output MUST be valid JSON (can be wrapped in ```json ... ``` markdown code block)
- Generate EXACTLY {num_gepa} variations
- Index must be 1, 2, 3, ... (sequential, starting at 1)
- Each "prompt" field must contain the complete, self-contained prompt text
- NO explanations, NO analysis, NO meta-commentary - just the JSON structure
DO NOT include:
- Analysis of what went wrong
- Explanations of your changes
- Meta-text like "Here's an improved version..." or "Based on feedback..."
- Recommendations or suggestions (those are already in the feedback)
- Any text outside the JSON structure
Output ONLY the JSON object with the variations."""
# Construct user prompt with clear structure
optimization_user_prompt = f"""CURRENT PROMPT (to be improved):
{current_prompt}
{feedback_text}
TASK: Generate {num_gepa} DISTINCT variations of improved prompts. Each variation should:
- Address ALL feedback issues identified above
- Use a DIFFERENT improvement strategy (clarity, robustness, structure)
- Be meaningfully different from the others (not just rewordings)
- Be complete and self-contained
Remember: Output ONLY the JSON object with {num_gepa} variations. No explanations."""
result = self._reflection_lm_client.generate(
system_prompt=optimization_system_prompt,
user_prompt=optimization_user_prompt,
image_base64=""
)
if isinstance(result, dict):
response_text = result.get("content", str(result))
else:
response_text = str(result)
# Parse JSON variations
gepa_variations = self._parse_json_variations(response_text, num_gepa)
# Add all variations to candidates
for idx, variation_prompt in enumerate(gepa_variations, 1):
# π‘οΈ DEFENSIVE FALLBACK: Extract clean prompt if LLM adds analysis despite instructions
gepa_candidate = self._extract_clean_prompt_from_reflection(variation_prompt)
if gepa_candidate != variation_prompt:
self.logger.debug(f" Variation {idx}: Extracted clean prompt (removed {len(variation_prompt) - len(gepa_candidate)} chars)")
all_candidates.append({
'prompt': gepa_candidate,
'source': 'gepa_reflection',
'index': idx
})
# π₯ CAPTURE CANDIDATE FOR LIVE UI DISPLAY
try:
import sys
if 'app' in sys.modules:
app_module = sys.modules['app']
if hasattr(app_module, 'add_candidate_to_store'):
app_module.add_candidate_to_store({
'prompt': gepa_candidate,
'source': 'gepa_reflection',
'timestamp': f"Candidate #{idx}"
})
except Exception:
pass # Silent fail - UI capture is optional
self.logger.info(f"\nβ
GEPA REFLECTION CANDIDATE #{idx}/{num_gepa} (FULL PROMPT - NO TRUNCATION):")
self.logger.info(f"{'β'*80}")
self.logger.info(f"{gepa_candidate}")
self.logger.info(f"{'β'*80}")
self.logger.info(f" Length: {len(gepa_candidate)} chars, Words: {len(gepa_candidate.split())}")
gepa_count = len(all_candidates)
self.logger.info(f"\nβ
GEPA Reflection: {gepa_count} candidates generated in single optimized call")
except Exception as e:
self.logger.error(f"β Error generating GEPA reflection candidates: {e}")
self.logger.warning(f" Falling back to sequential generation...")
import traceback
self.logger.debug(traceback.format_exc())
# Fallback: Sequential generation (when JSON parsing fails)
for i in range(num_gepa):
self.logger.info(f"\nπ Generating GEPA Reflection candidate #{i+1}/{num_gepa} (fallback mode)...")
try:
fallback_user_prompt = f"""CURRENT PROMPT (to be improved):
{current_prompt}
{feedback_text}
TASK: Generate an improved version of the CURRENT PROMPT that addresses all issues identified in the evaluation feedback above.
Remember: Output ONLY the improved prompt text. No explanations."""
result = self._reflection_lm_client.generate(
system_prompt=self._FALLBACK_SYSTEM_PROMPT,
user_prompt=fallback_user_prompt,
image_base64=""
)
if isinstance(result, dict):
gepa_candidate_raw = result.get("content", str(result))
else:
gepa_candidate_raw = str(result)
gepa_candidate = self._extract_clean_prompt_from_reflection(gepa_candidate_raw)
all_candidates.append({
'prompt': gepa_candidate,
'source': 'gepa_reflection',
'index': i + 1
})
# π₯ CAPTURE CANDIDATE FOR LIVE UI DISPLAY
try:
import sys
if 'app' in sys.modules:
app_module = sys.modules['app']
if hasattr(app_module, 'add_candidate_to_store'):
app_module.add_candidate_to_store({
'prompt': gepa_candidate,
'source': 'gepa_reflection',
'timestamp': f"Fallback #{i+1}"
})
except Exception:
pass # Silent fail - UI capture is optional
except Exception as fallback_error:
self.logger.error(f"β Error in fallback generation #{i+1}: {fallback_error}")
gepa_count = len(all_candidates)
if gepa_count > 0:
self.logger.info(f"\nβ
GEPA Reflection: {gepa_count} candidates generated")
# βββββββββββββββββββββββββββββββββββββββββββββββββββββ
# PART 2: LLEGO GENETIC OPERATORS
# βββββββββββββββββββββββββββββββββββββββββββββββββββββ
self.logger.info(f"𧬠PART 3: LLEGO GENETIC OPERATORS - Structural Diversity")
if self.llego:
# π₯ FIX 2: Get Pareto front from GEPA (not LLEGO population)
# This ensures LLEGO operators use true non-dominated solutions
from ..utils.pareto_logger import get_pareto_logger
pareto_log = get_pareto_logger()
gepa_pareto_front = pareto_log.pareto_front
# Convert GEPA Pareto front to PromptCandidate format
pareto_candidates = self.llego._convert_gepa_pareto_to_candidates(gepa_pareto_front)
pareto_front = pareto_candidates
self.logger.info(f" Using GEPA Pareto front (size: {len(gepa_pareto_front)})")
self.logger.info(f" Converted to {len(pareto_front)} PromptCandidate objects")
for idx, p in enumerate(pareto_front, 1):
cand_type = p.metadata.get('candidate_type', 'unknown') if p.metadata else 'unknown'
notation = p.metadata.get('notation', 'S') if p.metadata else 'S'
self.logger.info(f" {notation}: [fitness={p.fitness:.3f}, type={cand_type}, length={len(p.prompt)} chars]")
# Create LLM callable for LLEGO
def llm_callable(genetic_prompt: str) -> str:
# π₯ LLEGO genetic prompt already contains full instructions
# Use minimal system prompt to avoid instruction conflict
result = self._reflection_lm_client.generate(
system_prompt="You are an expert prompt engineer. Follow the instructions provided in the user message to generate an improved prompt. Output only the prompt text, no explanations.",
user_prompt=genetic_prompt,
image_base64=""
)
if isinstance(result, dict):
return result.get('content', str(result))
return str(result)
# Generate LLEGO offspring
try:
llego_prompts = self.llego.evolve_generation(
llm=llm_callable,
pareto_front=pareto_front
)
n_crossover = self._config.n_crossover if hasattr(self._config, 'n_crossover') else 2
crossover_count = min(n_crossover, len(llego_prompts))
for i, prompt in enumerate(llego_prompts):
if i < crossover_count:
source = 'llego_crossover'
else:
source = 'llego_mutation'
all_candidates.append({
'prompt': prompt,
'source': source,
'index': i + 1
})
# π₯ CAPTURE CANDIDATE FOR LIVE UI DISPLAY
try:
import sys
if 'app' in sys.modules:
app_module = sys.modules['app']
if hasattr(app_module, 'add_candidate_to_store'):
app_module.add_candidate_to_store({
'prompt': prompt,
'source': source,
'timestamp': f"Candidate #{i+1}"
})
except Exception:
pass # Silent fail - UI capture is optional
border_char = "β" if source == 'llego_crossover' else "β"
self.logger.info(f"\n{border_char*80}")
self.logger.info(f"{border_char} {'π LLEGO CROSSOVER' if source == 'llego_crossover' else 'π² LLEGO MUTATION'} candidate #{i+1}")
self.logger.info(f"{border_char*80}")
self.logger.info(f"{prompt}")
self.logger.info(f"{border_char*80}")
self.logger.info(f" Length: {len(prompt)} chars, Words: {len(prompt.split())}")
self.logger.info(f"β
LLEGO Genetic Operators: {len(llego_prompts)} candidates generated")
except Exception as e:
self.logger.error(f"β Error generating LLEGO candidates: {e}")
import traceback
self.logger.error(traceback.format_exc())
# βββββββββββββββββββββββββββββββββββββββββββββββββββββ
# SUMMARY
# βββββββββββββββββββββββββββββββββββββββββββββββββββββ
self.logger.info(f"\n{'='*80}")
self.logger.info(f"π ADAPTER-LEVEL HYBRID GENERATION SUMMARY")
self.logger.info(f"{'='*80}")
self.logger.info(f" π GEPA Reflection: {gepa_count} candidates")
self.logger.info(f" π LLEGO Crossover: {len([c for c in all_candidates if c['source'] == 'llego_crossover'])} candidates")
self.logger.info(f" π² LLEGO Mutation: {len([c for c in all_candidates if c['source'] == 'llego_mutation'])} candidates")
self.logger.info(f" π¦ TOTAL: {len(all_candidates)} diverse candidates")
self.logger.info(f"{'='*80}\n")
# Store candidates (GEPA might access them through some mechanism)
self._generated_candidates = all_candidates
# Log each candidate with FULL text
self.logger.info(f"\n{'='*80}")
self.logger.info(f"π ALL GENERATED CANDIDATES (FULL PROMPTS - NO TRUNCATION)")
self.logger.info(f"{'='*80}")
for i, cand in enumerate(all_candidates, 1):
source_emoji = "π" if cand['source'] == 'gepa_reflection' else "π" if cand['source'] == 'llego_crossover' else "π²"
self.logger.info(f"\n{source_emoji} CANDIDATE #{i} - {cand['source'].upper().replace('_', ' ')}")
self.logger.info(f"{cand['prompt']}")
self.logger.info(f" Length: {len(cand['prompt'])} characters")
self.logger.info(f" Words: {len(cand['prompt'].split())} words")
self.logger.info(f"{'='*80}\n")
# Return candidates as list of dicts with metadata (not just strings)
# This ensures source information is preserved
return all_candidates # Return full dicts with source info
except Exception as e:
self.logger.error(f"\n{'β'*80}")
self.logger.error(f"β CRITICAL ERROR in _generate_hybrid_candidates_adapter_level!")
self.logger.error(f"β Error: {str(e)}")
self.logger.error(f"{'β'*80}\n")
import traceback
self.logger.error(traceback.format_exc())
return []
def propose_new_texts(
self,
candidate: Dict[str, str],
reflective_dataset: Dict[str, List[Dict[str, Any]]],
components_to_update: List[str]
) -> Dict[str, str]:
"""
π₯ CRITICAL: This method is called by GEPA to propose new component texts.
This is the KEY integration point - GEPA checks if adapter.propose_new_texts exists,
and if it does, uses it instead of the default InstructionProposalSignature.
This method:
1. Uses reflective_dataset to generate improved prompts
2. Optionally uses LLEGO for additional diversity
3. Returns dict mapping component_name -> new component text
Args:
candidate: Current candidate dict (component_name -> component_text)
reflective_dataset: Feedback data per component (from make_reflective_dataset)
components_to_update: List of component names to update
Returns:
Dict mapping component_name -> new component text
"""
self.logger.info(f"\n{'='*80}")
self.logger.info(f"π― PROPOSE_NEW_TEXTS CALLED BY GEPA")
self.logger.info(f"{'='*80}")
self.logger.info(f" Components to update: {components_to_update}")
self.logger.info(f" Reflective dataset keys: {list(reflective_dataset.keys())}")
# π₯ FIX: Check if we already generated candidates in hybrid mode
# If yes, return one of them instead of generating a new one (avoids duplicate work and context overflow)
if hasattr(self, '_generated_candidates') and self._generated_candidates:
self.logger.info(f"\nβ
HYBRID MODE: Using pre-generated candidates from make_reflective_dataset")
self.logger.info(f" Available candidates: {len(self._generated_candidates)}")
self.logger.info(f" Returning first candidate (GEPA will evaluate all of them)")
# Return the first candidate (GEPA will get others via queue)
first_candidate = self._generated_candidates[0]
new_texts = {}
for component in components_to_update:
if isinstance(first_candidate, dict) and 'prompt' in first_candidate:
new_texts[component] = first_candidate['prompt']
source = first_candidate.get('source', 'unknown')
self.logger.info(f" Returning {source} candidate (length: {len(first_candidate['prompt'])} chars)")
else:
new_texts[component] = str(first_candidate)
self.logger.info(f"{'='*80}\n")
return new_texts
new_texts = {}
# Check if we have reflection_lm_client (required for proposal)
if not self._reflection_lm_client:
self.logger.error("β reflection_lm_client not available - cannot generate proposals")
# Fallback: return current candidate (no change)
for component in components_to_update:
new_texts[component] = candidate.get(component, '')
return new_texts
# For each component to update
for component_name in components_to_update:
self.logger.info(f"π Proposing new text for component: {component_name}")
current_text = candidate.get(component_name, '')
dataset = reflective_dataset.get(component_name, [])
if not dataset:
self.logger.warning(f"β οΈ No feedback data for {component_name}, keeping current text")
new_texts[component_name] = current_text
continue
self.logger.info(f" Current text length: {len(current_text)} chars")
self.logger.info(f" Feedback examples: {len(dataset)}")
# Generate improved prompt using reflection LM
try:
# π₯ FIX: Clean dataset to remove base64 images (prevents context overflow)
cleaned_dataset = []
for item in dataset:
cleaned_item = item.copy()
# Remove or truncate base64 image data
if 'image_base64' in cleaned_item:
img_len = len(cleaned_item['image_base64'])
cleaned_item['image_base64'] = f'[IMAGE_DATA_REMOVED_{img_len}_chars]'
if 'image' in cleaned_item and isinstance(cleaned_item['image'], str) and len(cleaned_item['image']) > 1000:
img_len = len(cleaned_item['image'])
cleaned_item['image'] = f'[IMAGE_DATA_REMOVED_{img_len}_chars]'
# Also clean any nested detailed_scores
if 'detailed_scores' in cleaned_item and isinstance(cleaned_item['detailed_scores'], dict):
for key in list(cleaned_item['detailed_scores'].keys()):
val = cleaned_item['detailed_scores'][key]
if isinstance(val, str) and len(val) > 5000:
cleaned_item['detailed_scores'][key] = f'[LARGE_DATA_REMOVED_{len(val)}_chars]'
cleaned_dataset.append(cleaned_item)
self.logger.info(f" π Cleaned dataset: removed base64 images to prevent context overflow")
# Use GEPA's default instruction proposal format
from gepa.strategies.instruction_proposal import InstructionProposalSignature
# Build input dict for GEPA's instruction proposal
input_dict = {
"current_instruction_doc": current_text,
"dataset_with_feedback": cleaned_dataset # Use cleaned dataset!
}
# Generate prompt using GEPA's signature
prompt = InstructionProposalSignature.prompt_renderer(input_dict)
# Call reflection LM to generate new instruction
self.logger.info(f" Generating improved prompt via reflection LM...")
result = self._reflection_lm_client.generate(
system_prompt="You are an expert prompt engineer. Follow the instructions in the user message to generate an improved prompt.",
user_prompt=prompt,
image_base64=""
)
# Extract response
if isinstance(result, dict):
response_text = result.get("content", str(result))
else:
response_text = str(result)
# Extract instruction using GEPA's extractor
extracted = InstructionProposalSignature.output_extractor(response_text)
new_instruction = extracted.get("new_instruction", response_text.strip())
# Clean up the instruction (remove markdown, quotes, etc.)
new_instruction = self._clean_extracted_prompt(new_instruction)
self.logger.info(f" β
Generated new text (length: {len(new_instruction)} chars)")
self.logger.info(f" Preview: {new_instruction[:150]}...")
new_texts[component_name] = new_instruction
except Exception as e:
self.logger.error(f"β Error generating proposal for {component_name}: {e}")
import traceback
self.logger.error(traceback.format_exc())
# Fallback: return current text
new_texts[component_name] = current_text
self.logger.info(f"\n{'='*80}")
self.logger.info(f"β
PROPOSE_NEW_TEXTS COMPLETE")
self.logger.info(f" Generated {len(new_texts)} new component texts")
self.logger.info(f"{'='*80}\n")
return new_texts
def _clean_extracted_prompt(self, prompt: str) -> str:
"""
Clean extracted prompt by removing markdown, quotes, and extra whitespace.
Args:
prompt: Raw extracted prompt text
Returns:
Cleaned prompt text
"""
if not prompt:
return prompt
# Remove markdown code blocks
prompt = re.sub(r'```[\w]*\n?', '', prompt)
prompt = re.sub(r'```', '', prompt)
# Remove quotes if entire prompt is quoted
prompt = prompt.strip()
if (prompt.startswith('"') and prompt.endswith('"')) or \
(prompt.startswith("'") and prompt.endswith("'")):
prompt = prompt[1:-1]
# Remove leading/trailing whitespace
prompt = prompt.strip()
return prompt |