Spaces:
Running
Running
File size: 243,852 Bytes
3193174 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557 2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745 2746 2747 2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811 2812 2813 2814 2815 2816 2817 2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839 2840 2841 2842 2843 2844 2845 2846 2847 2848 2849 2850 2851 2852 2853 2854 2855 2856 2857 2858 2859 2860 2861 2862 2863 2864 2865 2866 2867 2868 2869 2870 2871 2872 2873 2874 2875 2876 2877 2878 2879 2880 2881 2882 2883 2884 2885 2886 2887 2888 2889 2890 2891 2892 2893 2894 2895 2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913 2914 2915 2916 2917 2918 2919 2920 2921 2922 2923 2924 2925 2926 2927 2928 2929 2930 2931 2932 2933 2934 2935 2936 2937 2938 2939 2940 2941 2942 2943 2944 2945 2946 2947 2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958 2959 2960 2961 2962 2963 2964 2965 2966 2967 2968 2969 2970 2971 2972 2973 2974 2975 2976 2977 2978 2979 2980 2981 2982 2983 2984 2985 2986 2987 2988 2989 2990 2991 2992 2993 2994 2995 2996 2997 2998 2999 3000 3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020 3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041 3042 3043 3044 3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055 3056 3057 3058 3059 3060 3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080 3081 3082 3083 3084 3085 3086 3087 3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103 3104 3105 3106 3107 3108 3109 3110 3111 3112 3113 3114 3115 3116 3117 3118 3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161 3162 3163 3164 3165 3166 3167 3168 3169 3170 3171 3172 3173 3174 3175 3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196 3197 3198 3199 3200 3201 3202 3203 3204 3205 3206 3207 3208 3209 3210 3211 3212 3213 3214 3215 3216 3217 3218 3219 3220 3221 3222 3223 3224 3225 3226 3227 3228 3229 3230 3231 3232 3233 3234 3235 3236 3237 3238 3239 3240 3241 3242 3243 3244 3245 3246 3247 3248 3249 3250 3251 3252 3253 3254 3255 3256 3257 3258 3259 3260 3261 3262 3263 3264 3265 3266 3267 3268 3269 3270 3271 3272 3273 3274 3275 3276 3277 3278 3279 3280 3281 3282 3283 3284 3285 3286 3287 3288 3289 3290 3291 3292 3293 3294 3295 3296 3297 3298 3299 3300 3301 3302 3303 3304 3305 3306 3307 3308 3309 3310 3311 3312 3313 3314 3315 3316 3317 3318 3319 3320 3321 3322 3323 3324 3325 3326 3327 3328 3329 3330 3331 3332 3333 3334 3335 3336 3337 3338 3339 3340 3341 3342 3343 3344 3345 3346 3347 3348 3349 3350 3351 3352 3353 3354 3355 3356 3357 3358 3359 3360 3361 3362 3363 3364 3365 3366 3367 3368 3369 3370 3371 3372 3373 3374 3375 3376 3377 3378 3379 3380 3381 3382 3383 3384 3385 3386 3387 3388 3389 3390 3391 3392 3393 3394 3395 3396 3397 3398 3399 3400 3401 3402 3403 3404 3405 3406 3407 3408 3409 3410 3411 3412 3413 3414 3415 3416 3417 3418 3419 3420 3421 3422 3423 3424 3425 3426 3427 3428 3429 3430 3431 3432 3433 3434 3435 3436 3437 3438 3439 3440 3441 3442 3443 3444 3445 3446 3447 3448 3449 3450 3451 3452 3453 3454 3455 3456 3457 3458 3459 3460 3461 3462 3463 3464 3465 3466 3467 3468 3469 3470 3471 3472 3473 3474 3475 3476 3477 3478 3479 3480 3481 3482 3483 3484 3485 3486 3487 3488 3489 3490 3491 3492 3493 3494 3495 3496 3497 3498 3499 3500 3501 3502 3503 3504 3505 3506 3507 3508 3509 3510 3511 3512 3513 3514 3515 3516 3517 3518 3519 3520 3521 3522 3523 3524 3525 3526 3527 3528 3529 3530 3531 3532 3533 3534 3535 3536 3537 3538 3539 3540 3541 3542 3543 3544 3545 3546 3547 3548 3549 3550 3551 3552 3553 3554 3555 3556 3557 3558 3559 3560 3561 3562 3563 3564 3565 3566 3567 3568 3569 3570 3571 3572 3573 3574 3575 3576 3577 3578 3579 3580 3581 3582 3583 3584 3585 3586 3587 3588 3589 3590 3591 3592 3593 3594 3595 3596 3597 3598 3599 3600 3601 3602 3603 3604 3605 3606 3607 3608 3609 3610 3611 3612 3613 3614 3615 3616 3617 3618 3619 3620 3621 3622 3623 3624 3625 3626 3627 3628 3629 3630 3631 3632 3633 3634 3635 3636 3637 3638 3639 3640 3641 3642 3643 3644 3645 3646 3647 3648 3649 3650 3651 3652 3653 3654 3655 3656 3657 3658 3659 3660 3661 3662 3663 3664 3665 3666 3667 3668 3669 3670 3671 3672 3673 3674 3675 3676 3677 3678 3679 3680 3681 3682 3683 3684 3685 3686 3687 3688 3689 3690 3691 3692 3693 3694 3695 3696 3697 3698 3699 3700 3701 3702 3703 3704 3705 3706 3707 3708 3709 3710 3711 3712 3713 3714 3715 3716 3717 3718 3719 3720 3721 3722 3723 3724 3725 3726 3727 3728 3729 3730 3731 3732 3733 3734 3735 3736 3737 3738 3739 3740 3741 3742 3743 3744 3745 3746 3747 3748 3749 3750 3751 3752 3753 3754 3755 3756 3757 3758 3759 3760 3761 3762 3763 3764 3765 3766 3767 3768 3769 3770 3771 3772 3773 3774 3775 3776 3777 3778 3779 3780 3781 3782 3783 3784 3785 3786 3787 3788 3789 3790 3791 3792 3793 3794 3795 3796 3797 3798 3799 3800 3801 3802 3803 3804 3805 3806 3807 3808 3809 3810 3811 3812 3813 3814 3815 3816 3817 3818 3819 3820 3821 3822 3823 3824 3825 3826 3827 3828 3829 3830 3831 3832 3833 3834 3835 3836 3837 3838 3839 3840 3841 3842 3843 3844 3845 3846 3847 3848 3849 3850 3851 3852 3853 3854 3855 3856 3857 3858 3859 3860 3861 3862 3863 3864 3865 3866 3867 3868 3869 3870 3871 3872 3873 3874 3875 3876 3877 3878 3879 3880 3881 3882 3883 3884 3885 3886 3887 3888 3889 3890 3891 3892 3893 3894 3895 3896 3897 3898 3899 3900 3901 3902 3903 3904 3905 3906 3907 3908 3909 3910 3911 3912 3913 3914 3915 3916 3917 3918 3919 3920 3921 3922 3923 3924 3925 3926 3927 3928 3929 3930 3931 3932 3933 3934 3935 3936 3937 3938 3939 3940 3941 3942 3943 3944 3945 3946 3947 3948 3949 3950 3951 3952 3953 3954 3955 3956 3957 3958 3959 3960 3961 3962 3963 3964 3965 3966 3967 3968 3969 3970 3971 3972 3973 3974 3975 3976 3977 3978 3979 3980 3981 3982 3983 3984 3985 3986 3987 3988 3989 3990 3991 3992 3993 3994 3995 3996 3997 3998 3999 4000 4001 4002 4003 4004 4005 4006 4007 4008 4009 4010 4011 4012 4013 4014 4015 4016 4017 4018 4019 4020 4021 4022 4023 4024 4025 4026 4027 4028 4029 4030 4031 4032 4033 4034 4035 4036 4037 4038 4039 4040 4041 4042 4043 4044 4045 4046 4047 4048 4049 4050 4051 4052 4053 4054 4055 4056 4057 4058 4059 4060 4061 4062 4063 4064 4065 4066 4067 4068 4069 4070 4071 4072 4073 4074 4075 4076 4077 4078 4079 4080 4081 4082 4083 4084 4085 4086 4087 4088 4089 4090 4091 4092 4093 4094 4095 4096 4097 4098 4099 4100 4101 4102 4103 4104 4105 4106 4107 4108 4109 4110 4111 4112 4113 4114 4115 4116 4117 4118 4119 4120 4121 4122 4123 4124 4125 4126 4127 4128 4129 4130 4131 4132 4133 4134 4135 4136 4137 4138 4139 4140 4141 4142 4143 4144 4145 4146 4147 4148 4149 4150 4151 4152 4153 4154 4155 4156 4157 4158 4159 4160 4161 4162 4163 4164 4165 4166 4167 4168 4169 4170 4171 4172 4173 4174 4175 4176 4177 4178 4179 4180 4181 4182 4183 4184 4185 4186 4187 4188 4189 4190 4191 4192 4193 4194 4195 4196 4197 4198 4199 4200 4201 4202 4203 4204 4205 4206 4207 4208 4209 4210 4211 4212 4213 4214 4215 4216 4217 4218 4219 4220 4221 4222 4223 4224 4225 4226 4227 4228 4229 4230 4231 4232 4233 4234 4235 4236 4237 4238 4239 4240 4241 4242 4243 4244 4245 4246 4247 4248 4249 4250 4251 4252 4253 4254 4255 4256 4257 4258 4259 4260 4261 4262 4263 4264 4265 4266 4267 4268 4269 4270 4271 4272 4273 4274 4275 4276 4277 4278 4279 4280 4281 4282 4283 4284 4285 4286 4287 4288 4289 4290 4291 4292 4293 4294 4295 4296 4297 4298 4299 4300 4301 4302 4303 4304 4305 4306 4307 4308 4309 4310 4311 4312 4313 4314 4315 4316 4317 4318 4319 4320 4321 4322 4323 4324 4325 4326 4327 4328 4329 4330 4331 4332 4333 4334 4335 4336 4337 4338 4339 4340 4341 4342 4343 4344 4345 4346 4347 4348 4349 4350 4351 4352 4353 4354 4355 4356 4357 4358 4359 4360 4361 4362 4363 4364 4365 4366 4367 4368 4369 4370 4371 4372 4373 4374 4375 4376 4377 4378 4379 4380 4381 4382 4383 4384 4385 4386 4387 4388 4389 4390 4391 4392 4393 4394 4395 4396 4397 4398 4399 4400 4401 4402 4403 4404 4405 4406 4407 4408 4409 4410 4411 4412 4413 4414 4415 4416 4417 4418 4419 4420 4421 4422 4423 4424 4425 4426 4427 4428 4429 4430 4431 4432 4433 4434 4435 4436 4437 4438 4439 4440 4441 4442 4443 4444 4445 4446 4447 4448 4449 4450 4451 4452 4453 4454 4455 4456 4457 4458 4459 4460 4461 4462 4463 4464 4465 4466 4467 4468 4469 4470 4471 4472 4473 4474 4475 4476 4477 4478 4479 4480 4481 4482 4483 4484 4485 4486 4487 4488 4489 4490 4491 4492 4493 4494 4495 4496 4497 4498 4499 4500 4501 4502 4503 4504 4505 4506 4507 4508 4509 4510 4511 4512 4513 4514 4515 4516 4517 4518 4519 4520 4521 4522 4523 4524 4525 4526 4527 4528 4529 4530 4531 4532 4533 4534 4535 4536 4537 4538 4539 4540 4541 4542 4543 4544 4545 4546 4547 4548 4549 4550 4551 4552 4553 4554 4555 4556 4557 4558 4559 4560 4561 4562 4563 4564 4565 4566 4567 4568 4569 4570 4571 4572 4573 4574 4575 4576 4577 4578 4579 4580 4581 4582 4583 4584 4585 4586 4587 4588 4589 4590 4591 4592 4593 4594 4595 4596 4597 4598 4599 4600 4601 4602 4603 4604 4605 4606 4607 4608 4609 4610 4611 4612 4613 4614 4615 4616 4617 4618 4619 4620 4621 4622 4623 4624 4625 4626 4627 4628 4629 4630 4631 4632 4633 4634 4635 4636 4637 4638 4639 4640 4641 4642 4643 4644 4645 4646 4647 4648 4649 4650 4651 4652 4653 4654 4655 4656 4657 4658 4659 4660 4661 4662 4663 4664 4665 4666 4667 4668 4669 4670 4671 4672 4673 4674 4675 4676 4677 4678 4679 4680 4681 4682 4683 4684 4685 4686 4687 4688 4689 4690 4691 4692 4693 4694 4695 4696 4697 4698 4699 4700 4701 4702 4703 4704 4705 4706 4707 4708 4709 4710 4711 4712 4713 4714 4715 4716 4717 4718 4719 4720 4721 4722 4723 4724 4725 4726 4727 4728 4729 4730 4731 4732 4733 4734 4735 4736 4737 4738 4739 4740 4741 4742 4743 4744 4745 4746 4747 4748 4749 4750 4751 4752 4753 4754 4755 4756 4757 4758 4759 4760 4761 4762 4763 4764 4765 4766 4767 4768 4769 4770 4771 4772 4773 4774 4775 4776 4777 4778 4779 4780 4781 4782 4783 4784 4785 4786 4787 4788 4789 4790 4791 4792 4793 4794 4795 4796 4797 4798 4799 4800 4801 4802 4803 4804 4805 4806 4807 4808 4809 4810 4811 4812 4813 4814 4815 4816 4817 4818 4819 4820 4821 4822 4823 4824 4825 4826 4827 4828 4829 4830 4831 4832 4833 4834 4835 4836 4837 4838 4839 4840 4841 4842 4843 4844 4845 4846 4847 4848 4849 4850 4851 4852 4853 4854 4855 4856 4857 4858 4859 4860 4861 4862 4863 4864 4865 4866 4867 4868 4869 4870 4871 4872 4873 4874 4875 4876 4877 4878 4879 4880 4881 4882 4883 4884 4885 4886 4887 4888 4889 4890 4891 4892 4893 4894 4895 4896 4897 4898 4899 4900 4901 4902 4903 4904 4905 4906 4907 4908 4909 4910 4911 4912 4913 4914 4915 4916 4917 4918 4919 4920 4921 4922 4923 4924 4925 4926 4927 4928 4929 4930 4931 4932 4933 4934 4935 4936 4937 4938 4939 4940 4941 4942 4943 4944 4945 4946 4947 4948 4949 4950 4951 4952 4953 4954 4955 4956 4957 4958 4959 4960 4961 4962 4963 4964 4965 4966 4967 4968 4969 4970 4971 4972 4973 4974 4975 4976 4977 4978 4979 4980 4981 4982 4983 4984 4985 4986 4987 4988 4989 4990 4991 4992 4993 4994 4995 4996 4997 4998 4999 5000 5001 5002 5003 5004 5005 5006 5007 5008 5009 5010 5011 5012 5013 5014 5015 5016 5017 5018 5019 5020 5021 5022 5023 5024 5025 5026 5027 5028 5029 5030 5031 5032 5033 5034 5035 5036 5037 5038 5039 5040 5041 5042 5043 5044 5045 5046 5047 5048 5049 5050 5051 5052 5053 5054 5055 5056 5057 5058 5059 5060 5061 5062 5063 5064 5065 5066 5067 5068 5069 5070 5071 5072 5073 5074 5075 5076 5077 5078 5079 5080 5081 5082 5083 5084 5085 5086 5087 5088 5089 5090 5091 5092 5093 5094 5095 5096 5097 5098 5099 5100 5101 5102 5103 5104 5105 5106 5107 5108 5109 5110 5111 5112 5113 5114 5115 5116 5117 5118 5119 5120 5121 5122 5123 5124 5125 5126 5127 5128 5129 5130 5131 5132 5133 5134 5135 5136 5137 5138 5139 5140 5141 5142 5143 5144 5145 5146 5147 5148 5149 5150 5151 5152 5153 5154 5155 5156 5157 5158 5159 5160 5161 5162 5163 5164 5165 5166 5167 5168 5169 5170 5171 5172 5173 5174 5175 5176 5177 5178 5179 5180 5181 5182 5183 5184 5185 5186 5187 5188 5189 5190 5191 5192 5193 5194 5195 5196 5197 5198 5199 5200 5201 5202 5203 5204 5205 5206 5207 5208 5209 5210 5211 5212 5213 5214 5215 5216 5217 5218 5219 5220 5221 5222 5223 5224 5225 5226 5227 5228 5229 5230 5231 5232 5233 5234 5235 5236 5237 5238 5239 5240 5241 5242 5243 5244 5245 5246 5247 5248 5249 5250 5251 5252 5253 5254 5255 5256 5257 5258 5259 5260 5261 5262 5263 5264 5265 5266 5267 5268 5269 5270 5271 5272 5273 5274 5275 5276 5277 5278 5279 5280 5281 5282 5283 5284 5285 5286 5287 5288 5289 5290 5291 5292 5293 5294 5295 5296 5297 5298 5299 5300 5301 5302 5303 5304 5305 5306 5307 5308 5309 5310 5311 5312 5313 5314 5315 5316 5317 5318 5319 5320 5321 5322 5323 5324 5325 5326 5327 5328 5329 5330 5331 5332 5333 5334 5335 5336 5337 5338 5339 5340 5341 5342 5343 5344 5345 5346 5347 5348 5349 5350 5351 5352 5353 5354 5355 5356 5357 5358 5359 5360 5361 5362 5363 5364 5365 5366 5367 5368 5369 5370 5371 5372 5373 5374 5375 5376 5377 5378 5379 5380 5381 5382 5383 5384 5385 5386 5387 5388 5389 5390 5391 5392 5393 5394 5395 5396 5397 5398 5399 5400 5401 5402 5403 5404 5405 5406 5407 5408 5409 5410 5411 5412 5413 5414 5415 5416 5417 5418 5419 5420 5421 5422 5423 5424 5425 5426 5427 5428 5429 5430 5431 5432 5433 5434 5435 5436 5437 5438 5439 5440 5441 5442 5443 5444 5445 5446 5447 5448 5449 5450 5451 5452 5453 5454 5455 5456 5457 5458 5459 5460 5461 5462 5463 5464 5465 5466 5467 5468 5469 5470 5471 5472 5473 5474 5475 5476 5477 5478 5479 5480 5481 5482 5483 5484 5485 5486 5487 5488 5489 5490 5491 5492 5493 5494 5495 5496 5497 5498 5499 5500 5501 5502 5503 5504 5505 5506 5507 5508 5509 5510 5511 5512 5513 5514 5515 5516 5517 5518 5519 5520 5521 5522 5523 5524 5525 5526 5527 5528 5529 5530 5531 5532 5533 5534 5535 5536 5537 5538 5539 5540 5541 5542 5543 5544 5545 5546 5547 5548 5549 5550 5551 5552 5553 5554 5555 5556 5557 5558 5559 5560 5561 5562 5563 5564 5565 5566 5567 5568 5569 5570 5571 5572 5573 5574 5575 5576 5577 5578 5579 5580 5581 5582 5583 5584 5585 5586 5587 5588 5589 5590 5591 5592 5593 5594 5595 5596 5597 5598 5599 5600 5601 5602 5603 5604 5605 5606 5607 5608 5609 5610 5611 5612 5613 5614 5615 5616 5617 5618 5619 5620 5621 5622 5623 5624 5625 5626 5627 5628 5629 5630 5631 5632 5633 5634 5635 5636 5637 5638 5639 5640 5641 5642 5643 5644 5645 5646 5647 5648 5649 5650 5651 5652 5653 5654 5655 5656 5657 5658 5659 5660 5661 5662 5663 5664 5665 5666 5667 5668 5669 5670 5671 5672 5673 5674 5675 5676 5677 5678 5679 5680 5681 5682 5683 5684 5685 5686 5687 5688 5689 5690 5691 5692 5693 5694 5695 5696 5697 5698 5699 5700 5701 5702 5703 5704 5705 5706 5707 5708 5709 5710 5711 5712 5713 5714 5715 5716 5717 5718 5719 5720 5721 5722 5723 5724 5725 5726 5727 5728 5729 5730 5731 5732 5733 5734 5735 5736 5737 5738 5739 5740 5741 5742 5743 5744 5745 5746 5747 5748 5749 5750 5751 5752 5753 5754 5755 5756 5757 5758 5759 5760 5761 5762 5763 5764 5765 5766 5767 5768 5769 5770 5771 5772 5773 5774 5775 5776 5777 5778 5779 5780 5781 5782 5783 5784 5785 5786 5787 5788 5789 5790 5791 5792 5793 5794 5795 5796 5797 5798 5799 5800 5801 5802 5803 5804 5805 5806 5807 5808 5809 5810 5811 5812 5813 5814 5815 5816 5817 5818 5819 5820 5821 5822 5823 5824 5825 5826 5827 5828 5829 5830 5831 5832 5833 5834 5835 5836 5837 5838 5839 5840 5841 5842 5843 5844 5845 5846 5847 5848 5849 5850 5851 5852 5853 5854 5855 5856 5857 5858 5859 5860 5861 5862 5863 5864 5865 5866 5867 5868 5869 5870 5871 5872 5873 5874 5875 5876 5877 5878 5879 5880 5881 5882 5883 5884 5885 5886 5887 5888 5889 5890 5891 5892 5893 5894 5895 5896 5897 5898 5899 5900 5901 5902 5903 5904 5905 5906 5907 5908 5909 5910 5911 5912 5913 5914 5915 5916 5917 5918 5919 5920 5921 5922 5923 5924 5925 5926 5927 5928 5929 5930 5931 5932 5933 5934 5935 5936 5937 5938 5939 5940 5941 5942 5943 5944 5945 5946 5947 5948 5949 5950 5951 5952 5953 5954 5955 5956 5957 5958 5959 5960 5961 5962 5963 5964 5965 5966 5967 5968 5969 5970 5971 5972 5973 5974 5975 5976 5977 5978 5979 5980 5981 5982 5983 5984 5985 5986 5987 5988 5989 5990 5991 5992 5993 5994 5995 5996 5997 5998 5999 6000 6001 6002 6003 6004 6005 6006 6007 6008 6009 6010 6011 6012 6013 6014 6015 6016 6017 6018 6019 6020 6021 6022 6023 6024 6025 6026 6027 6028 6029 6030 6031 6032 6033 6034 6035 6036 6037 6038 6039 6040 6041 6042 6043 6044 6045 6046 6047 6048 6049 6050 6051 6052 6053 6054 6055 6056 6057 6058 6059 6060 6061 6062 6063 6064 6065 6066 6067 6068 6069 6070 6071 6072 6073 6074 6075 6076 6077 6078 6079 6080 6081 6082 6083 6084 6085 6086 6087 6088 6089 6090 6091 6092 6093 6094 6095 6096 6097 6098 6099 6100 6101 6102 6103 6104 6105 6106 6107 6108 6109 6110 6111 6112 6113 6114 6115 6116 6117 6118 6119 6120 6121 6122 6123 6124 6125 6126 6127 6128 6129 6130 6131 6132 6133 6134 6135 6136 6137 6138 6139 6140 6141 6142 6143 6144 6145 6146 6147 6148 6149 6150 6151 6152 6153 6154 6155 6156 6157 6158 6159 6160 6161 6162 6163 6164 6165 6166 6167 6168 6169 6170 6171 6172 6173 6174 6175 6176 6177 6178 6179 6180 6181 6182 6183 6184 6185 6186 6187 6188 6189 6190 6191 6192 6193 6194 6195 6196 6197 6198 6199 6200 6201 6202 6203 6204 6205 6206 6207 6208 6209 6210 6211 6212 6213 6214 6215 6216 6217 6218 6219 6220 6221 6222 6223 6224 6225 6226 6227 6228 6229 6230 6231 6232 6233 6234 6235 6236 6237 6238 6239 6240 6241 6242 6243 6244 6245 6246 6247 6248 6249 6250 6251 6252 6253 6254 6255 6256 6257 6258 6259 6260 6261 6262 6263 6264 6265 6266 6267 6268 6269 6270 6271 6272 6273 6274 6275 6276 6277 6278 6279 6280 6281 6282 6283 6284 6285 6286 6287 6288 6289 6290 6291 6292 6293 6294 6295 6296 6297 6298 6299 6300 6301 6302 6303 6304 6305 6306 6307 6308 6309 6310 6311 6312 6313 6314 6315 6316 6317 6318 6319 6320 6321 6322 6323 6324 6325 6326 6327 6328 6329 6330 6331 6332 6333 6334 6335 6336 6337 6338 6339 6340 6341 6342 6343 6344 6345 6346 6347 6348 6349 6350 6351 6352 6353 6354 6355 6356 6357 6358 6359 6360 6361 6362 6363 6364 6365 6366 6367 6368 6369 6370 6371 6372 6373 6374 6375 6376 6377 6378 6379 6380 6381 6382 6383 6384 6385 6386 6387 6388 6389 6390 6391 6392 6393 6394 6395 6396 6397 6398 6399 6400 6401 6402 6403 6404 6405 6406 6407 6408 6409 6410 6411 6412 6413 6414 6415 6416 6417 6418 6419 6420 6421 6422 6423 6424 6425 6426 6427 6428 6429 6430 6431 6432 6433 6434 6435 6436 6437 6438 6439 6440 6441 6442 6443 6444 6445 6446 6447 6448 6449 6450 6451 6452 6453 6454 6455 6456 6457 6458 6459 6460 6461 6462 6463 6464 6465 6466 6467 6468 6469 6470 6471 6472 6473 6474 6475 6476 6477 6478 6479 6480 6481 6482 6483 6484 6485 6486 6487 6488 6489 6490 6491 6492 6493 6494 6495 6496 6497 6498 6499 6500 6501 6502 6503 6504 6505 6506 6507 6508 6509 6510 6511 6512 6513 6514 6515 6516 6517 6518 6519 6520 6521 6522 6523 6524 6525 6526 6527 6528 6529 6530 6531 6532 6533 6534 6535 6536 6537 6538 6539 6540 6541 6542 6543 6544 6545 6546 6547 6548 6549 6550 6551 6552 6553 6554 6555 6556 6557 6558 6559 6560 6561 6562 6563 6564 6565 6566 6567 6568 6569 6570 6571 6572 6573 6574 6575 6576 6577 6578 6579 6580 6581 6582 6583 6584 6585 6586 6587 6588 6589 6590 6591 6592 6593 6594 6595 6596 6597 6598 6599 6600 6601 6602 6603 6604 6605 6606 6607 6608 6609 6610 6611 6612 6613 6614 6615 6616 6617 6618 6619 6620 6621 6622 6623 6624 6625 6626 6627 6628 6629 6630 6631 6632 6633 6634 6635 6636 6637 6638 6639 6640 6641 6642 6643 6644 6645 6646 6647 6648 6649 6650 6651 6652 6653 6654 6655 6656 6657 6658 6659 6660 6661 6662 6663 6664 6665 6666 6667 6668 6669 6670 6671 6672 6673 6674 6675 6676 6677 6678 6679 6680 6681 6682 6683 6684 6685 6686 6687 6688 6689 6690 6691 6692 6693 6694 6695 6696 6697 6698 6699 6700 6701 6702 6703 6704 6705 6706 6707 6708 6709 6710 6711 6712 6713 6714 6715 6716 6717 6718 6719 6720 6721 6722 6723 6724 6725 6726 6727 6728 6729 6730 6731 6732 6733 6734 6735 6736 6737 6738 6739 6740 6741 6742 6743 6744 6745 6746 6747 6748 6749 6750 6751 6752 6753 6754 6755 6756 6757 6758 6759 6760 6761 6762 6763 6764 6765 6766 6767 6768 6769 6770 6771 6772 6773 6774 6775 6776 6777 6778 6779 6780 6781 6782 6783 6784 6785 6786 6787 6788 6789 6790 6791 6792 6793 6794 6795 6796 6797 6798 6799 6800 6801 6802 6803 6804 6805 6806 6807 6808 6809 6810 6811 6812 6813 6814 6815 6816 6817 6818 6819 6820 6821 6822 6823 6824 6825 6826 6827 6828 6829 6830 6831 6832 6833 6834 6835 6836 6837 6838 6839 6840 6841 6842 6843 6844 6845 6846 6847 6848 6849 6850 6851 6852 6853 6854 6855 6856 6857 6858 6859 6860 6861 6862 6863 6864 6865 6866 6867 6868 6869 6870 6871 6872 6873 6874 6875 6876 6877 6878 6879 6880 6881 6882 6883 6884 6885 6886 6887 6888 6889 6890 6891 6892 6893 6894 6895 6896 6897 6898 6899 6900 6901 6902 6903 6904 6905 6906 6907 6908 6909 6910 6911 6912 6913 6914 6915 6916 6917 6918 6919 6920 6921 6922 6923 6924 6925 6926 6927 6928 6929 6930 6931 6932 6933 6934 6935 6936 6937 6938 6939 6940 6941 6942 6943 6944 6945 6946 6947 6948 6949 6950 6951 6952 6953 6954 6955 6956 6957 6958 6959 6960 6961 6962 6963 6964 6965 6966 6967 6968 6969 6970 6971 6972 6973 6974 6975 6976 6977 6978 6979 6980 6981 6982 6983 6984 6985 6986 6987 6988 6989 6990 6991 6992 6993 6994 6995 6996 6997 6998 6999 7000 7001 7002 7003 7004 7005 7006 7007 7008 7009 7010 7011 7012 7013 7014 7015 7016 7017 7018 7019 7020 7021 7022 7023 7024 7025 7026 7027 7028 7029 7030 7031 7032 7033 7034 7035 7036 7037 7038 7039 7040 7041 7042 7043 7044 7045 7046 7047 7048 7049 7050 7051 7052 7053 7054 7055 7056 7057 7058 7059 7060 7061 7062 7063 7064 7065 7066 7067 7068 7069 7070 7071 7072 7073 7074 7075 7076 7077 7078 7079 7080 7081 7082 7083 7084 7085 7086 7087 7088 7089 7090 7091 7092 7093 7094 7095 7096 7097 7098 7099 7100 7101 7102 7103 7104 7105 7106 7107 7108 7109 7110 7111 7112 7113 7114 7115 7116 7117 7118 7119 7120 7121 7122 7123 7124 7125 7126 7127 7128 7129 7130 7131 7132 7133 7134 7135 7136 7137 7138 7139 7140 7141 7142 7143 7144 7145 7146 7147 7148 7149 7150 7151 7152 7153 7154 7155 7156 7157 7158 7159 7160 7161 7162 7163 7164 7165 7166 7167 7168 7169 7170 7171 7172 7173 7174 7175 7176 7177 7178 7179 7180 7181 7182 7183 7184 7185 7186 7187 7188 7189 7190 7191 7192 7193 7194 7195 7196 7197 7198 7199 7200 7201 7202 7203 7204 7205 7206 7207 7208 7209 7210 7211 7212 7213 7214 7215 7216 7217 7218 7219 7220 7221 7222 7223 7224 7225 7226 7227 7228 7229 7230 7231 7232 7233 7234 7235 7236 7237 7238 7239 7240 7241 7242 7243 7244 7245 7246 7247 7248 7249 7250 7251 7252 7253 7254 7255 7256 7257 7258 7259 7260 7261 7262 7263 7264 7265 7266 7267 7268 7269 7270 7271 7272 7273 7274 7275 7276 7277 7278 7279 7280 7281 7282 7283 7284 7285 7286 7287 7288 7289 7290 7291 7292 7293 7294 7295 7296 7297 7298 7299 7300 7301 7302 7303 7304 7305 7306 7307 7308 7309 7310 7311 7312 7313 7314 7315 7316 7317 7318 7319 7320 7321 7322 7323 7324 7325 7326 7327 7328 7329 7330 7331 7332 7333 7334 7335 7336 7337 7338 7339 7340 7341 7342 7343 7344 7345 7346 7347 7348 7349 7350 7351 7352 7353 7354 7355 7356 7357 7358 7359 7360 7361 7362 7363 7364 7365 7366 7367 7368 7369 7370 7371 7372 7373 7374 7375 7376 7377 7378 7379 7380 7381 7382 7383 7384 7385 7386 7387 7388 7389 7390 7391 7392 7393 7394 7395 7396 7397 7398 7399 7400 7401 7402 7403 7404 7405 7406 7407 7408 7409 7410 7411 7412 7413 7414 7415 7416 7417 7418 7419 7420 7421 7422 7423 7424 7425 7426 7427 7428 7429 7430 7431 7432 7433 7434 7435 7436 7437 7438 7439 7440 7441 7442 7443 7444 7445 7446 7447 7448 7449 7450 7451 7452 7453 7454 7455 7456 7457 7458 7459 7460 7461 7462 7463 7464 7465 7466 7467 7468 7469 7470 7471 7472 7473 7474 7475 7476 7477 7478 7479 7480 7481 7482 7483 7484 7485 7486 7487 7488 7489 7490 7491 7492 7493 7494 7495 7496 7497 7498 7499 7500 7501 7502 7503 7504 7505 7506 7507 7508 7509 7510 7511 7512 7513 7514 7515 7516 7517 7518 7519 7520 7521 7522 7523 7524 7525 7526 7527 7528 7529 7530 7531 7532 7533 7534 7535 7536 7537 7538 7539 7540 7541 7542 7543 7544 7545 7546 7547 7548 7549 7550 7551 7552 7553 7554 7555 7556 7557 7558 7559 7560 7561 7562 7563 7564 7565 7566 7567 7568 7569 7570 7571 7572 7573 7574 7575 7576 7577 7578 7579 7580 7581 7582 7583 7584 7585 7586 7587 7588 7589 7590 7591 7592 7593 7594 7595 7596 7597 7598 7599 7600 7601 7602 7603 7604 7605 7606 7607 7608 7609 7610 7611 7612 7613 7614 7615 7616 7617 7618 7619 7620 7621 7622 7623 7624 7625 7626 7627 7628 7629 7630 7631 7632 7633 7634 7635 7636 7637 7638 7639 7640 7641 7642 7643 7644 7645 7646 7647 7648 7649 7650 7651 7652 7653 7654 7655 7656 7657 7658 7659 7660 7661 7662 7663 7664 7665 7666 7667 7668 7669 7670 7671 7672 7673 7674 7675 7676 7677 7678 7679 7680 7681 7682 7683 7684 7685 7686 7687 7688 7689 7690 7691 7692 7693 7694 7695 7696 7697 7698 7699 7700 7701 7702 7703 7704 7705 7706 7707 7708 7709 7710 7711 7712 7713 7714 7715 7716 7717 7718 7719 7720 7721 7722 7723 7724 7725 7726 7727 7728 7729 7730 7731 7732 7733 7734 7735 7736 7737 7738 7739 7740 7741 7742 7743 7744 7745 7746 7747 7748 7749 7750 7751 7752 7753 7754 7755 7756 7757 7758 7759 7760 7761 7762 7763 7764 7765 7766 7767 7768 7769 7770 7771 7772 7773 7774 7775 7776 7777 7778 7779 7780 7781 7782 7783 7784 7785 7786 7787 7788 7789 7790 7791 7792 7793 7794 7795 7796 7797 7798 7799 7800 7801 7802 7803 7804 7805 7806 7807 7808 7809 7810 7811 7812 7813 7814 7815 7816 7817 7818 7819 7820 7821 7822 7823 7824 7825 7826 7827 7828 7829 7830 7831 7832 7833 7834 7835 7836 7837 7838 7839 7840 7841 7842 7843 7844 7845 7846 7847 7848 7849 7850 7851 7852 7853 7854 7855 7856 7857 7858 7859 7860 7861 7862 7863 7864 7865 7866 7867 7868 7869 7870 7871 7872 7873 7874 7875 7876 7877 7878 7879 7880 7881 7882 7883 7884 7885 7886 7887 7888 7889 7890 7891 7892 7893 7894 7895 7896 7897 7898 7899 7900 7901 7902 7903 7904 7905 7906 7907 7908 7909 7910 7911 7912 7913 7914 7915 7916 7917 7918 7919 7920 7921 7922 7923 7924 7925 7926 7927 7928 7929 7930 7931 7932 7933 7934 7935 7936 7937 7938 7939 7940 7941 7942 7943 7944 7945 7946 7947 7948 7949 7950 7951 7952 7953 7954 7955 7956 7957 7958 7959 7960 7961 7962 7963 7964 7965 7966 7967 7968 7969 7970 7971 7972 7973 7974 7975 7976 7977 7978 7979 7980 7981 7982 7983 7984 7985 7986 7987 7988 7989 7990 7991 7992 7993 7994 7995 7996 7997 7998 7999 8000 8001 8002 8003 8004 8005 8006 8007 8008 8009 8010 8011 8012 8013 8014 8015 8016 8017 8018 8019 8020 8021 8022 8023 8024 8025 8026 8027 8028 8029 8030 8031 8032 8033 8034 8035 8036 8037 8038 8039 8040 8041 8042 8043 8044 8045 8046 8047 8048 8049 8050 8051 8052 8053 8054 8055 8056 8057 8058 8059 8060 8061 8062 8063 8064 8065 8066 8067 8068 8069 8070 8071 8072 8073 8074 8075 8076 8077 8078 8079 8080 8081 8082 8083 8084 8085 8086 8087 8088 8089 8090 8091 8092 8093 8094 8095 8096 8097 8098 8099 8100 8101 8102 8103 8104 8105 8106 8107 8108 8109 8110 8111 8112 8113 8114 8115 8116 8117 8118 8119 8120 8121 8122 8123 8124 8125 8126 8127 8128 8129 8130 8131 8132 8133 8134 8135 8136 8137 8138 8139 8140 8141 8142 8143 8144 8145 8146 8147 8148 8149 8150 8151 8152 8153 8154 8155 8156 8157 8158 8159 8160 8161 8162 8163 8164 8165 8166 8167 8168 8169 8170 8171 8172 8173 8174 8175 8176 8177 8178 8179 8180 8181 8182 8183 8184 8185 8186 8187 8188 8189 8190 8191 8192 8193 8194 8195 8196 8197 8198 8199 8200 8201 8202 8203 8204 8205 8206 8207 8208 8209 8210 8211 8212 8213 8214 8215 8216 8217 8218 8219 8220 8221 8222 8223 8224 8225 8226 8227 8228 8229 8230 8231 8232 8233 8234 8235 8236 8237 8238 8239 8240 8241 8242 8243 8244 8245 8246 8247 8248 8249 8250 8251 8252 8253 8254 8255 8256 8257 8258 8259 8260 8261 8262 8263 8264 8265 8266 8267 8268 8269 8270 8271 8272 8273 8274 8275 8276 8277 8278 8279 8280 8281 8282 8283 8284 8285 8286 8287 8288 8289 8290 8291 8292 8293 8294 8295 8296 8297 8298 8299 8300 8301 8302 8303 8304 8305 8306 8307 8308 8309 8310 8311 8312 8313 8314 8315 8316 8317 8318 8319 8320 8321 8322 8323 8324 8325 8326 8327 8328 8329 8330 8331 8332 8333 8334 8335 8336 8337 8338 8339 8340 8341 8342 8343 8344 8345 8346 8347 8348 8349 8350 8351 8352 8353 8354 8355 8356 8357 8358 8359 8360 8361 8362 8363 | # RustworkX Agent Framework β Full Documentation
<p align="center">
<strong>A modern graph-based framework for multi-agent systems</strong>
</p>
<p align="center">
<em>A flexible, high-performance alternative to LangGraph with dynamic topology, decentralized memory, and full access to graph structures</em>
</p>
---
## π Table of Contents
- [Introduction](#introduction)
- [Installation](#installation)
- [Quick Start](#quick-start)
- [Key Concepts](#key-concepts)
- [Core Components](#core-components)
- [RoleGraph](#rolegraph)
- [AgentProfile](#agentprofile)
- [TaskNode](#tasknode)
- [NodeEncoder](#nodeencoder)
- [MACPRunner](#macprunner)
- [Scheduler](#scheduler)
- [Memory System](#memory-system)
- [Streaming API](#streaming-api)
- [Token Budget](#token-budget-budget-system)
- [Error Handling](#error-handling-error-handling)
- [Graph Algorithms](#graph-algorithms-graph-algorithms)
- [Metrics Tracking](#metrics-tracking-metrics-tracker)
- [Visualization](#visualization-visualization)
- [Graph Schemas](#graph-schemas-schema-system)
- [Builder API](#builder-api-detailed)
- [Event System](#event-system-event-system)
- [Callback System (LangChain-like)](#callback-system)
- [State Storage](#state-storage-state-storage)
- [Async Utilities](#async-utilities-async-utils)
- [Conditional Routing](#conditional-routing-conditional-routing)
- [Agent Tools (Tools)](#agent-tools-tools)
- [Advanced Features](#advanced-features)
- [Execution Optimization and Token Savings](#execution-optimization-and-token-savings)
- [Multi-Model Support](#multi-model-support-multi-model-support)
- [Structured Prompt β modern chat LLMs (recommended)](#structured-prompt--modern-chat-llms-recommended)
- [Built-in factory helpers](#built-in-factory-helpers-recommended-zero-boilerplate)
- [Dynamic Topology](#dynamic-topology)
- [GNN Routing](#gnn-routing)
- [Hidden Channels](#hidden-channels)
- [Adaptive Execution](#adaptive-execution)
- [Configuration](#configuration)
- [Usage Examples](#usage-examples)
- [API Reference](#api-reference)
- [FAQ](#faq)
---
## Introduction
**RustworkX Agent Framework** (gMAS) is a framework for building multi-agent systems that uses the `rustworkx` library for high-performance graph operations. It addresses key limitations of existing solutions such as LangGraph:
### Why is gMAS better than LangGraph?
| Feature | LangGraph | gMAS Framework |
|-------------|-----------|----------------|
| **Topology** | Fixed | **Dynamic** (runtime changes via hooks) |
| **Token optimization** | Minimal | **Automatic** (filtering isolated nodes, disabled nodes, early stopping) |
| **Memory** | Centralized | Decentralized (agentsβ local state) |
| **Graph** | Hidden from the developer | First-class citizen (full access) |
| **Representations** | Text only | Text + embeddings + hidden states |
| **Typing and validation** | Minimal | **Full Pydantic validation** (type safety) |
| **Data schemas** | Informal | **Pydantic BaseModel** (auto-validation, serialization) |
| **Multi-model** | Limited | Full support for different LLMs per agent |
| **Parallelism** | Limited | Full async/parallel support |
| **ML integration** | None | PyTorch Geometric, GNN routing, RL hooks |
| **Serialization** | Manual | **Automatic** (Pydantic `.model_dump()`) |
| **Runtime adaptation** | None | **Topology hooks, early stopping, disabled nodes** |
| **Callbacks** | BaseCallbackHandler | **Full compatibility** (same methods: on_run_start, on_agent_end, on_tool_start/end/error, etc.) |
---
## Installation
### Requirements
- Python 3.12+
- PyTorch 2.0+
- **Pydantic 2.0+** (required β the framework is fully built on Pydantic)
### Via pip (from sources)
```bash
git clone https://github.com/yourusername/rustworkx-agent-framework.git
cd rustworkx-agent-framework
pip install -e .
```
### Dependencies
```bash
# Core (required)
pip install rustworkx>=0.13 pydantic>=2.0 pydantic-settings>=2.0 torch>=2.0 loguru>=0.7
# For embeddings (optional)
pip install sentence-transformers>=2.0
# For GNN routing (optional)
pip install torch-geometric>=2.0
# For visualization (optional)
pip install rich>=13.0 graphviz>=0.20
```
### Install all optional dependencies
```bash
pip install -e ".[all]"
```
### Important: Pydantic 2.0+
gMAS Framework **requires Pydantic 2.0+** and is incompatible with Pydantic 1.x. All models (`AgentProfile`, `TaskNode`, schemas, configurations) use the Pydantic v2 API:
- `.model_dump()` instead of `.dict()`
- `.model_validate()` instead of `.parse_obj()`
- `.model_dump_json()` instead of `.json()`
If you have Pydantic 1.x installed:
```bash
pip install --upgrade "pydantic>=2.0"
```
---
## Quick Start
### Minimal example
```python
from core import AgentProfile, RoleGraph
from execution import MACPRunner
from builder import build_property_graph
# 1. Define agents
agents = [
AgentProfile(
agent_id="solver",
display_name="Math Solver",
description="Solves math problems step by step",
tools=["calculator"],
),
AgentProfile(
agent_id="checker",
display_name="Answer Checker",
description="Checks solutions for correctness",
),
]
# 2. Define connections between agents
workflow_edges = [("solver", "checker")]
# 3. Build the graph
graph = build_property_graph(
agents,
workflow_edges=workflow_edges,
query="What is 25 Γ 17?",
)
# 4. Define an LLM call function
def my_llm_caller(prompt: str) -> str:
# Integrate your LLM here (OpenAI, Anthropic, local, etc.)
return call_your_llm(prompt)
# 5. Run execution
runner = MACPRunner(llm_caller=my_llm_caller)
result = runner.run_round(graph)
# 6. Get results
print(f"Answer: {result.final_answer}")
print(f"Execution order: {result.execution_order}")
print(f"Tokens used: {result.total_tokens}")
```
### Quick Start: with monitoring (Callbacks)
```python
from execution import MACPRunner, RunnerConfig
from callbacks import (
StdoutCallbackHandler,
MetricsCallbackHandler,
collect_metrics,
)
# 1. Add callback handlers
config = RunnerConfig(
callbacks=[
StdoutCallbackHandler(show_outputs=True), # Console output
MetricsCallbackHandler(), # Metrics collection
]
)
runner = MACPRunner(llm_caller=my_llm_caller, config=config)
result = runner.run_round(graph)
# 2. Or use a context manager
with collect_metrics() as metrics:
result = runner.run_round(graph)
print(f"Total tokens: {metrics.total_tokens}")
print(f"Execution time: {metrics.total_duration_ms}ms")
print(f"Agent calls: {metrics.get_metrics()['agent_calls']}")
```
### Quick Start: multi-model (different LLM for each agent)
```python
from builder import GraphBuilder
from execution import MACPRunner, LLMCallerFactory
# 1. Create a builder and add agents with different models
builder = GraphBuilder()
# Agent 1: strong model for complex analysis
builder.add_agent(
agent_id="analyst",
display_name="Senior Analyst",
llm_backbone="gpt-4",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
temperature=0.0,
max_tokens=2000,
)
# Agent 2: smaller model for formatting
builder.add_agent(
agent_id="formatter",
display_name="Report Formatter",
llm_backbone="gpt-4o-mini",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
temperature=0.3,
max_tokens=500,
)
# 2. Define edges
builder.add_workflow_edge("analyst", "formatter")
# 3. Set the query and build the graph
builder.add_task(query="Analyze Q4 sales")
graph = builder.build()
# 4. Create an LLM factory (automatically creates callers for each agent)
factory = LLMCallerFactory.create_openai_factory()
# 5. Run execution
runner = MACPRunner(llm_factory=factory)
result = runner.run_round(graph)
# 6. Get the result
print(f"Final answer: {result.final_answer}")
print("Savings: use gpt-4 only for analysis, gpt-4o-mini for formatting")
```
### Quick Start: token optimization and dynamic topology
```python
from builder import GraphBuilder
from execution import (
MACPRunner, RunnerConfig, EarlyStopCondition, TopologyAction
)
# 1. Create a graph with explicit boundaries
builder = GraphBuilder()
builder.add_agent("input", persona="Input processor")
builder.add_agent("solver", persona="Problem solver")
builder.add_agent("checker", persona="Solution checker")
builder.add_agent("expert", persona="Expert reviewer (expensive)")
builder.add_agent("output", persona="Output formatter")
builder.add_agent("optional", persona="Optional analyzer")
builder.add_workflow_edge("input", "solver")
builder.add_workflow_edge("solver", "checker")
builder.add_workflow_edge("checker", "output")
# expert is connected dynamically when needed
# Set boundaries (for filtering unreachable nodes)
builder.set_start_node("input")
builder.set_end_node("output")
builder.add_task(query="Solve the problem")
builder.connect_task_to_agents()
graph = builder.build()
# 2. Disable optional nodes
graph.disable("optional") # Will not run, token savings
# 3. Hook for topology adaptation
def adaptive_hook(ctx, graph):
# If checker found an error β add expert
if ctx.agent_id == "checker" and "ERROR" in (ctx.response or ""):
return TopologyAction(
add_edges=[("checker", "expert", 1.0), ("expert", "output", 1.0)],
trigger_rebuild=True
)
# If solver is confident β skip checker
if ctx.agent_id == "solver" and "CONFIDENT" in (ctx.response or ""):
return TopologyAction(skip_agents=["checker"])
return None
# 4. Configure runner with optimization
config = RunnerConfig(
adaptive=True,
enable_dynamic_topology=True,
topology_hooks=[adaptive_hook],
early_stop_conditions=[
EarlyStopCondition.on_keyword("FINAL_ANSWER"),
EarlyStopCondition.on_token_limit(5000),
],
)
runner = MACPRunner(llm_caller=my_llm, config=config)
# 5. Execute with filtering unreachable nodes
result = runner.run_round(
graph,
filter_unreachable=True # Exclude nodes not on the input->output path
)
# 6. Result
print(f"Executed: {result.execution_order}")
print(f"Pruned: {result.pruned_agents}") # optional + unreachable
print(f"Early stopped: {result.early_stopped}")
print(f"Topology mods: {result.topology_modifications}") # was expert added?
print(f"Tokens: {result.total_tokens}")
```
---
## Key Concepts
### Pydantic-oriented architecture
gMAS Framework is **fully built on Pydantic** for type safety, validation, and data serialization. All key models inherit from `pydantic.BaseModel`:
#### Core Pydantic models in the framework
| Model | Purpose | Notes |
|--------|-----------|-------------|
| `AgentProfile` | Agent profile | `frozen=True` (immutable), `arbitrary_types_allowed` for torch.Tensor |
| `AgentLLMConfig` | Agent LLM configuration | Validates model parameters, supports env vars |
| `TaskNode` | Task node | Stores the query and task context |
| `GraphSchema` | Schema of the whole graph | Nodes (dict), edges (list), metadata |
| `AgentNodeSchema` | Agent-node schema | LLM config, tools, metrics, embeddings |
| `TaskNodeSchema` | Task-node schema | Query, status, deadline |
| `BaseEdgeSchema` | Base edge schema | Weight, probability, cost metrics |
| `WorkflowEdgeSchema` | Workflow edge | Conditions, priority, transformations |
| `CostMetrics` | Cost metrics | Tokens, latency, trust, reliability |
| `LLMConfig` | Full LLM configuration | Model name, base URL, API key, generation parameters |
| `VisualizationStyle` | Visualization styles | Settings for colors, shapes, what to show |
| `NodeStyle` | Node style | Shape, colors, icon |
| `EdgeStyle` | Edge style | Line style, arrow, colors |
| `ValidationResult` | Validation result | Errors, warnings |
| `FeatureConfig` | GNN configuration | Feature dimensions |
| `TrainingConfig` | Training configuration | Learning rate, epochs, optimizer |
#### Benefits of Pydantic in gMAS
1. **Automatic type validation**
```python
# Pydantic automatically checks types
agent = AgentProfile(
agent_id="test", # str - OK
display_name="Test Agent", # str - OK
tools=["search", "calc"], # list[str] - OK
)
# Validation error for a wrong type
agent = AgentProfile(agent_id=123) # β ValidationError: agent_id must be str
```
2. **Default values**
```python
# Pydantic fills fields with default values
agent = AgentProfile(agent_id="test", display_name="Test")
print(agent.tools) # [] (empty list by default)
print(agent.persona) # "" (empty string by default)
```
3. **Automatic type conversion**
```python
# Pydantic validators can automatically convert types
schema = AgentNodeSchema(
id="test",
embedding=torch.tensor([0.1, 0.2, 0.3]) # torch.Tensor β list[float]
)
print(type(schema.embedding)) # <class 'list'>
```
4. **Nested models**
```python
# Pydantic validates nested models
agent = AgentProfile(
agent_id="test",
display_name="Test",
llm_config=AgentLLMConfig( # Nested Pydantic model
model_name="gpt-4",
temperature=0.7,
)
)
```
5. **Serialization and deserialization**
```python
# Built-in Pydantic methods
data = agent.model_dump() # β dict
json_str = agent.model_dump_json(indent=2) # β JSON string
# Load from dict/JSON
loaded = AgentProfile.model_validate(data)
loaded_json = AgentProfile.model_validate_json(json_str)
```
6. **Immutability**
```python
# frozen=True for AgentProfile
agent = AgentProfile(agent_id="test", display_name="Test")
agent.agent_id = "new_id" # β ValidationError: frozen model
# Use copy methods for changes
updated = agent.model_copy(update={"display_name": "New Name"})
```
7. **Extensibility**
```python
# extra="allow" enables arbitrary fields
schema = GraphSchema(
name="MyGraph",
custom_field="custom_value", # Additional field
another_field=123, # Another one
)
```
### Declarative typing
Thanks to Pydantic, all types are declarative and are checked both statically (mypy, pyright) and dynamically (at runtime):
```python
from core import AgentProfile
from core.schema import AgentNodeSchema, LLMConfig
# Static typing (IDE autocompletion)
agent: AgentProfile = AgentProfile(...)
config: LLMConfig = LLMConfig(...)
schema: AgentNodeSchema = AgentNodeSchema(...)
# Dynamic validation (runtime)
try:
bad_agent = AgentProfile(agent_id=None) # β None instead of str
except ValidationError as e:
print(e.errors()) # Detailed error information
```
---
### Decentralized data storage
Unlike centralized architectures, gMAS uses a **decentralized** approach:
- **Embeddings** are stored inside `AgentProfile.embedding`
- **Hidden states** are stored inside `AgentProfile.hidden_state`
- **Local memory** is stored inside `AgentProfile.state`
- `RoleGraph.embeddings` is an accessor that gathers embeddings from all agents into a single tensor
This allows each agent to own its representations and ensures node independence.
### System architecture
```
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β RoleGraph β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β Agent ββββ Agent ββββ Agent ββββ Agent β β
β β Profile β β Profile β β Profile β β Profile β β
β β(embeddingβ β(embeddingβ β(embeddingβ β(embeddingβ β
β β state) β β state) β β state) β β state) β β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β β β β β
β βββββββββββββββ΄ββββββββββββββ΄ββββββββββββββ β
β Adjacency matrix (A_com) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β MACPRunner β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Scheduler β β Memory β β Budget β β
β β β β Pool β β Tracker β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββ
β MACPResult β
β β’ messages β
β β’ final_answer β
β β’ metrics β
βββββββββββββββββββ
```
### Data flow
1. **Create agents** β `AgentProfile` describes the role, capabilities, and tools
2. **Build the graph** β `build_property_graph` creates a `RoleGraph` with topology
3. **Planning** β `Scheduler` determines the execution order
4. **Execution** β `MACPRunner` runs agents sequentially/in parallel
5. **Result** β `MACPResult` contains all agentsβ responses and metrics
---
## Core Components
### RoleGraph
`RoleGraph` is the central data structure representing the agent graph.
```python
from core import RoleGraph
# === Graph properties ===
graph.num_nodes # Number of nodes
graph.num_edges # Number of edges
graph.agents # List of AgentProfile objects
graph.node_ids # List of node IDs ["agent1", "agent2", ...]
graph.role_sequence # Role order (legacy)
graph.A_com # Adjacency matrix (torch.Tensor, N x N)
graph.edge_index # Edge index in PyG format (torch.Tensor, 2 x E)
graph.edge_attr # Edge attributes (torch.Tensor, E x feature_dim)
graph.embeddings # Accessor: gathers agent embeddings into a tensor (N x dim)
graph.graph # Internal rustworkx.PyDiGraph object
graph.task_node # TaskNode if enabled, otherwise None
graph.query # Task query (string)
# === Node operations ===
# Add a node
graph.add_node(
agent, # AgentProfile
connections_to=["other"], # List of IDs for outgoing edges
connections_from=["prev"], # List of IDs for incoming edges
weight=1.0, # Default edge weight
)
# Remove a node with a state migration policy
graph.remove_node(
"agent_id",
policy=StateMigrationPolicy.ARCHIVE, # DISCARD, COPY, ARCHIVE
)
# Replace a node
graph.replace_node(
old_node_id="old",
new_agent=new_agent_profile,
policy=StateMigrationPolicy.COPY, # Copy state
keep_connections=True, # Preserve edges
)
# Get an agent
agent = graph.get_agent_by_id("agent_id")
# Get node index in the matrix
idx = graph.get_node_index("agent_id") # -> int
# Existence check
if "agent_id" in graph.node_ids:
...
# === Edge operations ===
# Add an edge
graph.add_edge(
source="agent1",
target="agent2",
weight=0.8,
edge_type="workflow", # Edge type (optional)
metadata={"priority": 1}, # Additional data
)
# Remove an edge
graph.remove_edge("agent1", "agent2")
# Update edge weight
graph.update_edge_weight("agent1", "agent2", new_weight=0.9)
# Get neighbors
out_neighbors = graph.get_neighbors("agent_id", direction="out") # Outgoing
in_neighbors = graph.get_neighbors("agent_id", direction="in") # Incoming
all_neighbors = graph.get_neighbors("agent_id", direction="both") # All
# Check whether an edge exists
has_edge = graph.has_edge("agent1", "agent2")
# Get edge weight
weight = graph.get_edge_weight("agent1", "agent2")
# === Execution bounds (start/end nodes) ===
# Set start and end nodes for optimization
graph.set_start_node("input_agent")
graph.set_end_node("output_agent")
# Or set both at once
graph.set_execution_bounds("input_agent", "output_agent")
# Inspect bounds
print(f"Start: {graph.start_node}, End: {graph.end_node}")
# === Disabled nodes ===
# Disable nodes (they remain in the graph but will not be executed)
graph.disable("agent1") # One node
graph.disable(["agent2", "agent3"]) # Multiple nodes
# Enable back
graph.enable("agent1") # One node
graph.enable(["agent2", "agent3"]) # Multiple nodes
graph.enable() # All disabled nodes
# Check status
graph.is_enabled("agent1") # -> bool
graph.get_enabled() # -> ["agent1", ...]
graph.get_disabled() # -> ["agent2", ...]
# Use case: token savings based on algorithms
if rl_model.predict(graph_state) < threshold:
graph.disable("expensive_agent")
# === Reachability analysis ===
# Get nodes reachable from start_node
reachable = graph.get_reachable_from("input_agent")
# Get nodes that can reach end_node
reaching = graph.get_nodes_reaching("output_agent")
# Get relevant nodes (on the path start -> end)
relevant = graph.get_relevant_nodes()
# Automatically uses graph.start_node and graph.end_node
# Get isolated nodes (not on the path start -> end)
isolated = graph.get_isolated_nodes()
# Optimized execution order (without isolated nodes)
order = graph.get_optimized_execution_order()
# === Conditional edges ===
# Add an edge with a condition
from execution.scheduler import ConditionContext
def condition_func(context: ConditionContext) -> bool:
return context.state.get("quality") > 0.8
graph.add_conditional_edge(
source="writer",
target="editor",
condition=condition_func,
weight=0.9,
)
# === Dynamic topology updates ===
# Full update of the adjacency matrix
graph.update_communication(
a_new, # New adjacency matrix (torch.Tensor)
s_tilde=scores, # Quality score matrix (optional)
p_matrix=probabilities, # Transition probability matrix (optional)
)
# === Conversion and export ===
# Serialize to a dictionary
data = graph.to_dict()
# {
# "agents": [...],
# "adjacency": [[...]],
# "query": "...",
# "task_node": {...},
# }
# Convert to PyTorch Geometric Data
pyg_data = graph.to_pyg_data()
# Data(x=node_features, edge_index=edges, edge_attr=weights)
# Extract a subgraph
subgraph = graph.subgraph(["agent1", "agent2", "agent3"])
# Copy the graph
graph_copy = graph.copy()
# === Integrity checks ===
# Verify consistency of internal structures
graph.verify_integrity(raise_on_error=True)
# Quick check
is_valid = graph.is_consistent()
# === Graph analysis ===
# Check whether it is a DAG (directed acyclic graph)
is_dag = graph.is_dag()
# Get topological order (if DAG)
if graph.is_dag():
topo_order = graph.topological_sort()
# === Agent updates ===
# Update an agent's embedding
agent = graph.get_agent_by_id("solver")
agent = agent.with_embedding(new_embedding)
graph.update_agent("solver", agent)
# Update an agent's state
agent = agent.append_state({"role": "assistant", "content": "Response"})
graph.update_agent("solver", agent)
# === Batch operations ===
# Update multiple agents
updates = {
"agent1": updated_agent1,
"agent2": updated_agent2,
}
graph.batch_update_agents(updates)
# Add multiple edges
edges = [
("a", "b", 0.8),
("b", "c", 0.9),
("c", "d", 0.7),
]
graph.batch_add_edges(edges)
```
#### State migration policies
When removing or replacing a node, you can specify a migration policy:
```python
from core.graph import StateMigrationPolicy
# DISCARD β state is removed
graph.remove_node("agent_id", policy=StateMigrationPolicy.DISCARD)
# COPY β state is copied into the new node
graph.replace_node("old_id", new_agent, policy=StateMigrationPolicy.COPY)
# ARCHIVE β state is saved to external storage
graph.remove_node("agent_id", policy=StateMigrationPolicy.ARCHIVE)
```
---
### AgentProfile
`AgentProfile` is an **immutable Pydantic model** (`BaseModel` with `frozen=True`) representing an agent profile with description, tools, state, and LLM configuration.
> **Important**:
> - `AgentProfile` inherits from `pydantic.BaseModel`, providing **automatic type validation** and **type safety**
> - Embeddings and hidden states are stored **at the agent level**, not at the graph level
> - **Multi-model support** β each agent can have its own LLM configuration
> - Immutability (`frozen=True`) β methods return new objects
#### AgentProfile structure (Pydantic model)
| Field | Type | Description |
|------|-----|----------|
| `agent_id` | `str` | Unique agent identifier (required) |
| `display_name` | `str` | Display name (required) |
| `persona` | `str` | Agent role/persona (e.g., "Expert analyst") |
| `description` | `str` | Textual description of agent capabilities |
| `llm_backbone` | `str \| None` | LLM model identifier (legacy; use `llm_config`) |
| `llm_config` | `AgentLLMConfig \| None` | **Pydantic model** for the agentβs LLM configuration |
| `tools` | `list[str]` | List of available tools (shell, code_interpreter, file_search, web_search, custom) |
| `raw` | `Mapping[str, Any]` | Arbitrary extra data |
| `embedding` | `torch.Tensor \| None` | Agent vector representation (arbitrary_types_allowed) |
| `state` | `list[dict[str, Any]]` | Local state / message history |
| `hidden_state` | `torch.Tensor \| None` | Hidden state passed between agents |
#### AgentLLMConfig (Pydantic model)
```python
from core.agent import AgentLLMConfig
# AgentLLMConfig - a Pydantic model for LLM configuration
llm_config = AgentLLMConfig(
model_name="gpt-4", # Model name
base_url="https://api.openai.com/v1", # API endpoint
api_key="$OPENAI_API_KEY", # Key (or $ENV_VAR)
max_tokens=2000, # Max tokens
temperature=0.7, # Temperature
timeout=60.0, # Timeout in seconds
top_p=0.9, # Top-p sampling
stop_sequences=["END", "STOP"], # Stop sequences
extra_params={"frequency_penalty": 0.5}, # Extra parameters
)
# AgentLLMConfig methods
api_key = llm_config.resolve_api_key() # Resolve $ENV_VAR
is_set = llm_config.is_configured() # Check whether configured
params = llm_config.to_generation_params() # Build params for the LLM
```
#### Creating and working with AgentProfile
```python
from core import AgentProfile
from core.agent import AgentLLMConfig
# 1. Basic creation (Pydantic validates types)
agent = AgentProfile(
agent_id="analyzer", # Unique ID (str, required)
display_name="Data Analyzer", # Display name (str, required)
persona="Expert data analyst", # Role/persona (str, default="")
description="Analyzes data and produces insights", # Description (str, default="")
tools=["python", "sql"], # Available tools (list[str], default=[])
)
# 2. Creation with LLM config (Pydantic model)
llm_config = AgentLLMConfig(
model_name="gpt-4",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY", # Resolved from environment
temperature=0.7,
max_tokens=2000,
)
agent = AgentProfile(
agent_id="researcher",
display_name="Researcher",
llm_config=llm_config, # Pydantic validates the nested model
tools=["web_search"],
)
# 3. State operations (immutable β returns a NEW object)
agent = agent.append_state({"role": "user", "content": "Hello!"})
agent = agent.with_state([{"role": "system", "content": "You are helpful"}])
agent = agent.clear_state()
# 4. Embeddings (arbitrary_types_allowed for torch.Tensor)
import torch
embedding = torch.randn(384)
agent = agent.with_embedding(embedding)
hidden_state = torch.randn(768)
agent = agent.with_hidden_state(hidden_state)
# 5. LLM config operations
agent = agent.with_llm_config(llm_config)
# Get the agent model name (priority: llm_config.model_name β llm_backbone)
model_name = agent.get_model_name() # "gpt-4"
# Check if a custom LLM configuration is set
if agent.has_custom_llm():
print(f"Agent uses custom LLM: {agent.llm_config.model_name}")
print(f"Base URL: {agent.llm_config.base_url}")
print(f"Generation params: {agent.llm_config.to_generation_params()}")
# 6. Serialization (Pydantic methods)
# For encoder (text)
text = agent.to_text()
# For persistence (dict, includes llm_config)
data = agent.to_dict()
# Pydantic serialization methods
agent_dict = agent.model_dump() # Dict[str, Any]
agent_json = agent.model_dump_json(indent=2) # JSON string
# 7. Deserialization (Pydantic methods)
loaded_agent = AgentProfile.model_validate(agent_dict)
loaded_from_json = AgentProfile.model_validate_json(agent_json)
```
#### Example: agents with different LLMs
```python
from core import AgentProfile
from core.agent import AgentLLMConfig
# Agent 1: strong model for analysis
analyst = AgentProfile(
agent_id="analyst",
display_name="Senior Analyst",
persona="Expert data analyst with 10 years experience",
description="Performs deep analysis of complex data",
llm_config=AgentLLMConfig(
model_name="gpt-4",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
temperature=0.0, # Deterministic for analysis
max_tokens=2000,
),
tools=["python", "sql", "visualization"],
)
# Agent 2: cheaper model for formatting
formatter = AgentProfile(
agent_id="formatter",
display_name="Report Formatter",
persona="Technical writer",
description="Formats analysis results into readable reports",
llm_config=AgentLLMConfig(
model_name="gpt-4o-mini", # Cheaper for simple tasks
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
temperature=0.3,
max_tokens=500,
),
tools=["markdown", "latex"],
)
# Agent 3: local model
local_agent = AgentProfile(
agent_id="local_llm",
display_name="Local Assistant",
llm_config=AgentLLMConfig(
model_name="llama3:70b",
base_url="http://localhost:11434/v1", # Ollama
temperature=0.5,
),
)
```
#### Benefits of Pydantic validation
1. **Automatic type checking** when creating objects
2. **Default values** for optional fields
3. **Immutability** (`frozen=True`) prevents accidental changes
4. **Nested models** (`AgentLLMConfig` is validated automatically)
5. **Serialization/deserialization** via `.model_dump()` and `.model_validate()`
6. **Support for arbitrary types** (`arbitrary_types_allowed`) for torch.Tensor
---
### TaskNode
`TaskNode` is an **immutable Pydantic model** (`BaseModel` with `frozen=True`) representing a virtual task node that stores the task query and can be connected to all agents.
> **Important**: `TaskNode` inherits from `pydantic.BaseModel`, providing automatic type validation and immutability (just like `AgentProfile`).
#### TaskNode structure (Pydantic model)
| Field | Type | Description |
|------|-----|----------|
| `agent_id` (`id`) | `str` | Task node identifier (default `__task__`) |
| `type` | `str` | Node type (`"task"`, automatically) |
| `query` | `str` | Task statement / query |
| `description` | `str` | Additional context description |
| `embedding` | `torch.Tensor \| None` | Task embedding (arbitrary_types_allowed) |
| `display_name` | `str` | Display name (default `"Task"`) |
| `persona` | `str` | Task persona/role (default empty) |
| `llm_backbone` | `str \| None` | Model identifier, if needed |
| `tools` | `list[str]` | Tools available to the task node (default=[]) |
| `state` | `list[dict[str, Any]]` | Local task state / message history (default=[]) |
```python
from core import TaskNode
# Pydantic validates types on creation
task = TaskNode(
agent_id="__task__", # can be overridden (str)
query="Draft a market research plan", # required (str)
description="A task for the whole team of agents", # optional (str, default="")
)
# Task embedding (optional, arbitrary_types_allowed for torch.Tensor)
import torch
task_embedding = torch.randn(384)
task = task.with_embedding(task_embedding)
# TaskNode is immutable (frozen=True), use copy methods
updated_task = task.model_copy(update={"description": "New description"})
# Pydantic serialization
task_dict = task.model_dump()
task_json = task.model_dump_json(indent=2)
# Deserialization
loaded = TaskNode.model_validate(task_dict)
```
> When using `build_property_graph(..., include_task_node=True)`, the task node is created automatically and connected to agents via context/update edges.
#### TaskNode methods (immutable)
```python
# Embedding operations (returns a new object)
task = task.with_embedding(embedding_tensor)
# State operations (returns a new object)
task = task.append_state({"role": "system", "content": "Context"})
task = task.with_state([{"role": "user", "content": "Query"}])
task = task.clear_state()
# Convert to text
task_text = task.to_text() # For encoder
# Convert to dict
task_data = task.to_dict() # For persistence
```
---
### NodeEncoder
`NodeEncoder` converts textual agent descriptions into vector representations.
```python
from core import NodeEncoder
# sentence-transformers (recommended)
encoder = NodeEncoder(
model_name="sentence-transformers/all-MiniLM-L6-v2",
normalize_embeddings=True,
)
# hash fallback (fast, no model required)
encoder = NodeEncoder(model_name="hash:256")
# Encode texts
texts = [agent.to_text() for agent in agents]
embeddings = encoder.encode(texts) # torch.Tensor (N x dim)
# Get dimensionality
dim = encoder.embedding_dim
```
---
### MACPRunner
`MACPRunner` is the executor of the Multi-Agent Communication Protocol.
```python
from execution import MACPRunner, RunnerConfig
# β
Recommended for modern chat LLMs (OpenAI, GigaChat, etc.)
# Sends proper system/user roles β no flat-string workaround needed.
from openai import OpenAI
client = OpenAI(api_key="sk-...")
def my_structured_caller(messages: list[dict]) -> str:
resp = client.chat.completions.create(model="gpt-4o", messages=messages)
return resp.choices[0].message.content or ""
runner = MACPRunner(structured_llm_caller=my_structured_caller)
# Legacy setup β one flat-string LLM for all agents (still supported)
runner = MACPRunner(
llm_caller=sync_llm_function, # Callable[[str], str]
async_llm_caller=async_llm_function, # Callable[[str], Awaitable[str]]
token_counter=my_token_counter, # Token counting
)
# Multi-model setup (different LLMs for different agents)
from execution import LLMCallerFactory, create_openai_caller
# Option 1: Use a factory (recommended)
factory = LLMCallerFactory.create_openai_factory(
default_model="gpt-4o-mini",
default_base_url="https://api.openai.com/v1",
)
runner = MACPRunner(llm_factory=factory)
# Option 2: A dictionary of callers per agent
runner = MACPRunner(
llm_callers={
"analyst": create_openai_caller(model="gpt-4", temperature=0.0),
"writer": create_openai_caller(model="gpt-4o-mini", temperature=0.7),
},
async_llm_callers={
"analyst": create_openai_caller(model="gpt-4", is_async=True),
"writer": create_openai_caller(model="gpt-4o-mini", is_async=True),
},
)
# Option 3: Combined (factory + overrides for specific agents)
runner = MACPRunner(
llm_factory=factory, # Default for everyone
llm_callers={"critical_agent": specialized_caller}, # Override for critical_agent
)
# Advanced configuration
config = RunnerConfig(
timeout=60.0, # Per-agent timeout
adaptive=True, # Adaptive mode
enable_parallel=True, # Parallel execution
max_parallel_size=5, # Max parallel agents
max_retries=2, # Retries on errors
update_states=True, # Update agent states
enable_memory=True, # Enable memory
callbacks=[StdoutCallbackHandler()], # Callbacks for logging
)
runner = MACPRunner(llm_caller=my_llm, config=config)
# Synchronous execution
result = runner.run_round(graph)
# With explicit execution bounds and filtering
result = runner.run_round(
graph,
start_agent_id="input", # Start agent (overrides graph.start_node)
final_agent_id="output", # Final agent (overrides graph.end_node)
filter_unreachable=True, # Exclude isolated nodes (token savings)
update_states=True, # Update agent states
)
# Asynchronous execution
result = await runner.arun_round(
graph,
start_agent_id="input",
final_agent_id="output",
filter_unreachable=True,
)
# Execution with hidden channels
result = runner.run_round_with_hidden(graph, hidden_encoder=encoder)
```
#### RunnerConfig (full specification)
```python
from execution import RunnerConfig, RoutingPolicy, PruningConfig, BudgetConfig, ErrorPolicy, ErrorAction
config = RunnerConfig(
# === Basic parameters ===
timeout=60.0, # Per-agent timeout (sec)
max_retries=3, # Max attempts on errors
update_states=True, # Update AgentProfile.state
# === Adaptive mode ===
# adaptive controls conditional edges, pruning, fallback, and routing
# policies. It does NOT affect whether agents run in parallel.
adaptive=True, # Enable conditional routing & pruning
routing_policy=RoutingPolicy.WEIGHTED_TOPO, # Routing policy
# === Parallel execution ===
# enable_parallel works independently of adaptive: when True,
# independent agents (those with all predecessors done) are executed
# concurrently via asyncio.gather. Works with both astream() and
# arun_round(), regardless of the adaptive flag.
enable_parallel=True, # Parallel group execution
max_parallel_size=5, # Max agents in a parallel group
# === Pruning ===
pruning_config=PruningConfig(
min_weight_threshold=0.1, # Min edge weight
min_probability_threshold=0.05, # Min transition probability
max_consecutive_errors=3, # Max consecutive errors
token_budget=10000, # Token budget for pruning
enable_fallback=True, # Use fallback agents
max_fallback_attempts=2, # Max fallback attempts
quality_scorer=None, # Quality scoring function
min_quality_threshold=0.3, # Min quality to continue
),
# === Budget ===
budget_config=BudgetConfig(
total_token_limit=50000,
node_token_limit=2000,
max_prompt_length=4000,
max_response_length=2000,
warn_at_usage_ratio=0.8,
total_time_limit_seconds=600,
total_request_limit=100,
),
# === Memory ===
enable_memory=True, # Enable memory system
memory_config=MemoryConfig(
working_max_entries=20,
long_term_max_entries=100,
working_default_ttl=3600.0,
auto_compress=True,
promote_after_accesses=3,
),
memory_context_limit=5, # Memory entries injected into the prompt
# === Hidden channels ===
enable_hidden_channels=True, # Passing hidden_state
hidden_combine_strategy="mean", # mean, sum, concat, attention
pass_embeddings=True, # Pass embeddings
# === Task query broadcast ===
broadcast_task_to_all=True, # True: task query is sent to all agents
# False: only to agents connected to the task node
# === Dynamic topology (runtime modification) ===
enable_dynamic_topology=True, # Enable runtime graph modifications
topology_hooks=[my_hook_func], # Sync hooks for topology modification
async_topology_hooks=[async_hook], # Async hooks for topology modification
early_stop_conditions=[ # Early stopping conditions
EarlyStopCondition.on_keyword("FINAL ANSWER"),
EarlyStopCondition.on_token_limit(10000),
EarlyStopCondition.on_custom(lambda ctx: my_logic(ctx)),
],
# === Callbacks (monitoring and logging) ===
callbacks=[ # Callback handlers
StdoutCallbackHandler( # Console output
show_prompts=False,
show_outputs=True,
),
MetricsCallbackHandler(), # Metrics aggregation
FileCallbackHandler("run.jsonl"), # File logging
],
# === Error handling ===
error_policy=ErrorPolicy(
on_timeout=ErrorAction.RETRY,
on_retry_exhausted=ErrorAction.PRUNE,
on_budget_exceeded=ErrorAction.ABORT,
on_validation_error=ErrorAction.ABORT,
),
# === Streaming ===
enable_token_streaming=False, # Enable token-level streaming if LLM supports it
)
```
#### Execution result (MACPResult)
```python
result.messages # Dict[agent_id -> response]
result.final_answer # Final agent answer
result.final_agent_id # Final agent ID
result.execution_order # Execution order
result.agent_states # Updated agent states
result.total_tokens # Total tokens
result.total_time # Execution time (sec)
result.topology_changed_count # Number of topology changes
result.fallback_count # Number of fallbacks
result.pruned_agents # Pruned agents (including disabled and isolated)
result.errors # List of errors
result.hidden_states # Agents' hidden states
result.metrics # ExecutionMetrics with detailed statistics
# New fields (dynamic topology)
result.early_stopped # bool: whether early stopping occurred
result.early_stop_reason # str: early stop reason
result.topology_modifications # int: number of topology modifications
```
---
### Scheduler
The scheduler determines the agent execution order.
```python
from execution import (
build_execution_order,
get_parallel_groups,
AdaptiveScheduler,
RoutingPolicy,
PruningConfig,
)
# Simple topological order
order = build_execution_order(graph.A_com, agent_ids)
# Parallel execution groups
groups = get_parallel_groups(graph.A_com, agent_ids)
# Result: [["a", "b"], ["c"], ["d", "e"]]
# Adaptive scheduler
scheduler = AdaptiveScheduler(
policy=RoutingPolicy.WEIGHTED_TOPO, # Routing policy
pruning_config=PruningConfig(
min_weight_threshold=0.1, # Min edge weight
min_probability_threshold=0.05, # Min probability
max_consecutive_errors=3, # Max consecutive errors
token_budget=10000, # Token budget
enable_fallback=True, # Enable fallback
max_fallback_attempts=2, # Max fallback attempts
),
beam_width=3, # Beam search width
)
# Build a plan
plan = scheduler.build_plan(
a_agents, # Agent adjacency matrix
agent_ids, # List of IDs
p_matrix=probs, # Probability matrix
end_agent="final", # Final agent
)
# Working with the plan
step = plan.get_current_step()
plan.mark_completed("agent_id", tokens=100)
plan.mark_failed("agent_id")
plan.mark_skipped("agent_id")
```
#### Routing policies (detailed)
```python
from execution import RoutingPolicy, AdaptiveScheduler
# ========== 1. TOPOLOGICAL (Topological sort) ==========
# Description: Classic topological sort for a DAG
# Use case: Simple pipelines without adaptivity
# Complexity: O(V + E)
scheduler = AdaptiveScheduler(policy=RoutingPolicy.TOPOLOGICAL)
plan = scheduler.build_plan(adjacency, agent_ids)
# Example:
# A β B β C β D
# Order: [A, B, C, D]
# ========== 2. WEIGHTED_TOPO (Weighted topological) ==========
# Description: Topological sort with priority based on edge weights
# Use case: When you need to account for connection importance
# Complexity: O(V + E log V)
scheduler = AdaptiveScheduler(policy=RoutingPolicy.WEIGHTED_TOPO)
plan = scheduler.build_plan(adjacency, agent_ids)
# Example:
# ββ(0.9)β B ββ
# A βββ€ ββ D
# ββ(0.3)β C ββ
# Order: [A, B, C, D] (B runs before C because 0.9 > 0.3)
# ========== 3. GREEDY (Greedy selection) ==========
# Description: At each step, selects the agent with the maximum edge weight
# Use case: Optimize for connection quality
# Complexity: O(VΒ²)
scheduler = AdaptiveScheduler(policy=RoutingPolicy.GREEDY)
plan = scheduler.build_plan(
adjacency,
agent_ids,
start_node="coordinator",
end_node="final",
)
# Example:
# Start β A(0.9) β B(0.8) β End
# Start β C(0.5) β D(0.7) β End
# Selected: Start β A β B β End (higher total weight)
# ========== 4. BEAM_SEARCH (Beam search) ==========
# Description: Keeps beam_width best paths and selects the optimal one
# Use case: Balance between quality and speed
# Complexity: O(V * beam_width * E)
scheduler = AdaptiveScheduler(
policy=RoutingPolicy.BEAM_SEARCH,
beam_width=3, # Keep 3 best paths
)
plan = scheduler.build_plan(
adjacency,
agent_ids,
p_matrix=probability_matrix, # Transition probabilities
)
# Example with beam_width=2:
# Start ββ¬β A(0.8) ββ¬β B(0.9) β End [path 1: 0.72]
# β ββ C(0.6) β End [path 2: 0.48]
# ββ D(0.7) ββ E(0.8) β End [path 3: 0.56]
# Beam keeps paths 1 and 3, drops path 2
# Final choice: path 1
# ========== 5. K_SHORTEST (K shortest paths) ==========
# Description: Finds K shortest paths and selects the best by a criterion
# Use case: When alternative routes are required
# Complexity: O(K * (V + E) log V)
scheduler = AdaptiveScheduler(
policy=RoutingPolicy.K_SHORTEST,
k_paths=5, # Find 5 shortest paths
)
plan = scheduler.build_plan(
adjacency,
agent_ids,
start_node="input",
end_node="output",
path_metric=PathMetric.WEIGHTED, # HOP_COUNT, WEIGHTED, RELIABILITY
)
# Example:
# Found paths:
# 1. input β A β B β output (cost=3, hops=3)
# 2. input β C β output (cost=4, hops=2)
# 3. input β A β D β output (cost=5, hops=3)
# 4. input β E β F β output (cost=6, hops=3)
# 5. input β G β output (cost=7, hops=2)
# Selection by metric: path 1 (minimum cost)
# ========== 6. GNN_BASED (GNN-based) ==========
# Description: Uses a trained GNN to predict the optimal route
# Use case: Adaptive routing based on history
# Requires: A trained GNN model
from core.gnn import GNNRouterInference
scheduler = AdaptiveScheduler(
policy=RoutingPolicy.GNN_BASED,
gnn_router=gnn_inference, # GNNRouterInference object
gnn_threshold=0.7, # Min confidence to use the GNN
)
# If confidence < threshold, fallback policy is used
scheduler.set_fallback_policy(RoutingPolicy.WEIGHTED_TOPO)
plan = scheduler.build_plan(
adjacency,
agent_ids,
metrics_tracker=tracker, # For GNN features
)
# ========== Policy comparison ==========
# | Policy | Adaptivity | Complexity | Quality | Use case |
# |----------------|----------------|----------------|----------|--------------------------------|
# | TOPOLOGICAL | No | O(V+E) | β | Simple pipelines |
# | WEIGHTED_TOPO | Low | O(V+EΒ·logV) | ββ | Priority-based pipelines |
# | GREEDY | Medium | O(VΒ²) | βββ | Weight-optimized routing |
# | BEAM_SEARCH | High | O(VΒ·kΒ·E) | ββββ | Quality/speed balance |
# | K_SHORTEST | High | O(KΒ·VΒ·logV) | ββββ | Alternative route search |
# | GNN_BASED | Very high | O(GNN) | βββββ | Trained systems |
# ========== Choosing a policy based on the task ==========
# Simple linear pipeline
config = RunnerConfig(routing_policy=RoutingPolicy.TOPOLOGICAL)
# Graph with different agent priorities
config = RunnerConfig(routing_policy=RoutingPolicy.WEIGHTED_TOPO)
# Optimize route quality
config = RunnerConfig(routing_policy=RoutingPolicy.GREEDY)
# Balance exploration vs exploitation
config = RunnerConfig(
routing_policy=RoutingPolicy.BEAM_SEARCH,
adaptive=True,
)
scheduler = AdaptiveScheduler(policy=RoutingPolicy.BEAM_SEARCH, beam_width=3)
# Need fallback alternatives
config = RunnerConfig(routing_policy=RoutingPolicy.K_SHORTEST)
scheduler = AdaptiveScheduler(policy=RoutingPolicy.K_SHORTEST, k_paths=3)
# Advanced trained system
config = RunnerConfig(routing_policy=RoutingPolicy.GNN_BASED)
scheduler = AdaptiveScheduler(
policy=RoutingPolicy.GNN_BASED,
gnn_router=trained_router,
)
```
---
### Memory System
A stratified memory system with **working** and **long-term** levels, supporting TTL, tags, priorities, and automatic compression.
#### Memory architecture
```
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β AgentMemory β
β ββββββββββββββββββββββ ββββββββββββββββββββββββ β
β β Working Memory β β Long-term Memory β β
β β (TTL: 1 hour) β β (TTL: β) β β
β β Max: 20 entries β β Max: 100 entries β β
β β β β β β
β β - Recent messages ββββββΆβ - Important facts β β
β β - Temp context β β - Key insights β β
β β - Active tasks β β - Historical data β β
β ββββββββββββββββββββββ ββββββββββββββββββββββββ β
β β² β² β
β β promotion β β
β β (after N accesses) β β
β ββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
β sharing
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SharedMemoryPool β
β Memory sharing between agents β
β - Broadcast: one β all β
β - Share: one β selected β
β - Query: search by tags β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
```
---
#### Basic usage of AgentMemory
```python
from utils.memory import (
AgentMemory,
MemoryConfig,
MemoryLevel,
MemoryEntry,
)
# 1. Memory configuration
config = MemoryConfig(
# Working memory (short-term)
working_max_entries=20, # Max entries
working_default_ttl=3600.0, # TTL: 1 hour
# Long-term memory
long_term_max_entries=100, # Max entries
long_term_default_ttl=None, # No expiration
# Automatic management
auto_compress=True, # Auto-compress on limit overflow
compress_strategy="truncate", # truncate, summarize
promote_after_accesses=3, # Promote to long-term after N accesses
# Prioritization
use_priority=True, # Consider priorities when evicting
priority_weight=0.3, # Priority weight vs recency
)
# 2. Create an agent memory
memory = AgentMemory("researcher", config)
# 3. Add entries
# 3.1. Add messages (the simplest way)
memory.add_message(role="user", content="Analyze the dataset")
memory.add_message(role="assistant", content="I will analyze it")
# 3.2. Add with parameters
memory.add(
content={"type": "insight", "text": "Pattern detected in data"},
level=MemoryLevel.WORKING, # WORKING or LONG_TERM
priority=5, # 0-10 (higher = more important)
tags={"insight", "data"}, # Tags for search
ttl=7200.0, # Custom TTL (2 hours)
metadata={"source": "analysis", "confidence": 0.95},
)
# 3.3. Add directly into long-term
memory.add(
content="Critical finding: correlation coefficient = 0.87",
level=MemoryLevel.LONG_TERM,
priority=10,
tags={"critical", "finding"},
)
# 4. Retrieve entries
# 4.1. Get recent messages
messages = memory.get_messages(limit=5)
for msg in messages:
print(f"{msg['role']}: {msg['content']}")
# 4.2. Get from working memory
working_entries = memory.get(level=MemoryLevel.WORKING, limit=10)
for entry in working_entries:
print(f"[{entry.priority}] {entry.content}")
# 4.3. Get from long-term memory
longterm_entries = memory.get(level=MemoryLevel.LONG_TERM)
# 4.4. Search by tags
insights = memory.search_by_tags({"insight"}, level=MemoryLevel.WORKING)
critical = memory.search_by_tags({"critical"}, level=MemoryLevel.LONG_TERM)
# 4.5. Get all entries
all_entries = memory.get_all()
# 5. Memory management
# 5.1. Remove an entry
memory.remove(entry_key)
# 5.2. Clear a level
memory.clear(level=MemoryLevel.WORKING)
# 5.3. Force compression
memory.compress(level=MemoryLevel.WORKING)
# 5.4. Promote an entry to long-term
memory.promote(entry_key)
# 5.5. Update an entry
memory.update(entry_key, new_content={"updated": "data"})
# 6. Stats
stats = memory.get_stats()
print(f"Working: {stats['working_count']}/{stats['working_max']}")
print(f"Long-term: {stats['longterm_count']}/{stats['longterm_max']}")
print(f"Total accesses: {stats['total_accesses']}")
print(f"Promotions: {stats['promotion_count']}")
```
---
#### SharedMemoryPool β memory sharing between agents
```python
from utils.memory import SharedMemoryPool
# 1. Create a pool
pool = SharedMemoryPool(max_shared_entries=1000)
# 2. Register agents
memory_a = AgentMemory("agent_a", config)
memory_b = AgentMemory("agent_b", config)
memory_c = AgentMemory("agent_c", config)
pool.register(memory_a)
pool.register(memory_b)
pool.register(memory_c)
# 3. Broadcast β send to everyone
pool.broadcast(
from_agent="agent_a",
entry={
"content": "Important discovery: X correlates with Y",
"priority": 8,
"tags": {"discovery", "shared"},
},
)
# All agents will receive this entry in working memory
# 4. Share β send to specific agents
pool.share(
from_agent="agent_a",
entry={"content": "Secret info", "priority": 9},
to_agents=["agent_b", "agent_c"],
)
# Only agent_b and agent_c receive the entry
# 5. Query β request information from the pool
results = pool.query(
tags={"discovery"},
min_priority=5,
limit=10,
)
for result in results:
print(f"From {result['source_agent']}: {result['content']}")
# 6. Subscribe to updates (callback)
def on_shared_entry(entry, from_agent, to_agents):
print(f"{from_agent} shared: {entry['content']}")
pool.subscribe("agent_b", on_shared_entry)
# 7. Remove from the pool
pool.unregister("agent_c")
# 8. Clear the pool
pool.clear()
```
---
#### Memory compression
```python
from utils.memory import (
TruncateCompressor,
SummaryCompressor,
)
# 1. Truncate β simple removal of old entries
compressor = TruncateCompressor(keep_ratio=0.5) # Keep 50%
memory = AgentMemory("agent", config)
memory.set_compressor(compressor)
# When over the limit, 50% of old entries are removed automatically
# 2. Summary β summarization using an LLM
def summarize_llm(entries: list[MemoryEntry]) -> str:
texts = [e.content for e in entries]
combined = "\n".join(texts)
return my_llm(f"Summarize these entries: {combined}")
compressor = SummaryCompressor(
summarizer=summarize_llm,
chunk_size=10, # Summarize in chunks of 10 entries
)
memory.set_compressor(compressor)
# On compression, 10 entries are replaced with 1 summarized entry
# 3. Custom compressor
from utils.memory import MemoryCompressor
class SmartCompressor(MemoryCompressor):
def compress(self, entries: list[MemoryEntry], target_count: int) -> list[MemoryEntry]:
# Remove low-priority and old entries
sorted_entries = sorted(
entries,
key=lambda e: (e.priority, e.timestamp),
reverse=True,
)
return sorted_entries[:target_count]
memory.set_compressor(SmartCompressor())
```
---
#### Integrating memory with the Runner
```python
from execution import MACPRunner, RunnerConfig
# 1. Configuration with memory enabled
config = RunnerConfig(
enable_memory=True,
memory_config=MemoryConfig(
working_max_entries=20,
long_term_max_entries=100,
auto_compress=True,
promote_after_accesses=3,
),
memory_context_limit=5, # How many entries to inject into the prompt
enable_shared_memory=True, # Enable SharedMemoryPool
)
runner = MACPRunner(llm_caller=my_llm, config=config)
# 2. Run β memory is updated automatically
result1 = runner.run_round(graph)
# 3. Access an agentβs memory
memory = runner.get_agent_memory("researcher")
entries = memory.get_messages(limit=10)
print(f"Researcher memory: {entries}")
# 4. Manually add to memory
runner.add_to_memory(
"researcher",
content="External knowledge: XYZ",
level=MemoryLevel.LONG_TERM,
priority=8,
)
# 5. Second round β agents retain context
graph.query = "Continue analysis from previous round"
result2 = runner.run_round(graph)
# 6. Export memories
memory_export = runner.export_memories()
# {
# "agent_a": {"working": [...], "long_term": [...]},
# "agent_b": {"working": [...], "long_term": [...]},
# }
# 7. Import memories (restore state)
runner.import_memories(memory_export)
# 8. Clear memory for all agents
runner.clear_all_memories()
```
---
#### Advanced usage: Semantic memory search
```python
from utils.memory import SemanticMemoryIndex
from core import NodeEncoder
# 1. Create a semantic index
encoder = NodeEncoder(model_name="sentence-transformers/all-MiniLM-L6-v2")
semantic_index = SemanticMemoryIndex(encoder)
# 2. Add entries to the index
memory = AgentMemory("agent", config)
for entry in memory.get_all():
semantic_index.add(entry.key, entry.content, entry.tags)
# 3. Semantic search
query = "findings about correlation"
results = semantic_index.search(
query,
top_k=5,
min_similarity=0.7,
filter_tags={"finding"},
)
for result in results:
print(f"[{result['similarity']:.3f}] {result['content']}")
# 4. Integration with AgentMemory
memory.enable_semantic_search(encoder)
# Now you can search semantically
results = memory.semantic_search(
query="data patterns",
top_k=3,
level=MemoryLevel.LONG_TERM,
)
```
---
#### Practical example: Multi-round conversation with memory
```python
# Create a graph with memory
agents = [
AgentProfile(agent_id="analyzer", display_name="Data Analyzer"),
AgentProfile(agent_id="reporter", display_name="Report Writer"),
]
graph = build_property_graph(
agents,
workflow_edges=[("analyzer", "reporter")],
query="Analyze dataset.csv",
)
# Memory-enabled configuration
config = RunnerConfig(
enable_memory=True,
memory_config=MemoryConfig(
working_max_entries=15,
long_term_max_entries=50,
auto_compress=True,
promote_after_accesses=2,
),
memory_context_limit=5,
enable_shared_memory=True,
)
runner = MACPRunner(llm_caller=my_llm, config=config)
# Round 1: Initial analysis
graph.query = "Analyze the dataset and find key patterns"
result1 = runner.run_round(graph)
print(f"Round 1 answer: {result1.final_answer}")
# Analyzer saved findings to memory
analyzer_memory = runner.get_agent_memory("analyzer")
print(f"Analyzer memory entries: {len(analyzer_memory.get_all())}")
# Round 2: Deeper analysis (agents remember the previous round)
graph.query = "Based on previous findings, analyze correlations"
result2 = runner.run_round(graph)
print(f"Round 2 answer: {result2.final_answer}")
# Round 3: Report generation
graph.query = "Generate final report summarizing all findings"
result3 = runner.run_round(graph)
print(f"Round 3 answer: {result3.final_answer}")
# Reporter used accumulated memory for a complete report
reporter_memory = runner.get_agent_memory("reporter")
# Export full history
history = {
"round_1": result1.to_dict(),
"round_2": result2.to_dict(),
"round_3": result3.to_dict(),
"memories": runner.export_memories(),
}
import json
with open("conversation_history.json", "w") as f:
json.dump(history, f, indent=2)
```
---
### Streaming API
LangGraph-like streaming for real-time output.
```python
from execution import (
MACPRunner,
StreamEventType,
StreamBuffer,
format_event,
print_stream,
)
runner = MACPRunner(llm_caller=my_llm)
# Synchronous streaming
for event in runner.stream(graph):
if event.event_type == StreamEventType.AGENT_OUTPUT:
print(f"{event.agent_id}: {event.content}")
elif event.event_type == StreamEventType.TOKEN:
print(event.token, end="", flush=True)
# Asynchronous streaming
async for event in runner.astream(graph):
print(format_event(event))
# Using a buffer
buffer = StreamBuffer()
for event in runner.stream(graph):
buffer.add(event)
# ... handle the event
print(f"Final answer: {buffer.final_answer}")
print(f"Agent outputs: {buffer.agent_outputs}")
# Convenience printing
answer = print_stream(runner.stream(graph), show_tokens=True)
```
#### Event types (full specification)
```python
from execution.streaming import StreamEventType, StreamEvent
# === Execution lifecycle ===
StreamEventType.RUN_START
# Fields: run_id, query, num_agents, config
StreamEventType.RUN_END
# Fields: run_id, success, total_time, total_tokens, execution_order, final_answer
# === Agent events ===
StreamEventType.AGENT_START
# Fields: agent_id, step_index, predecessors, prompt_preview
StreamEventType.AGENT_OUTPUT
# Fields: agent_id, step_index, content, tokens_used, latency_ms
StreamEventType.AGENT_ERROR
# Fields: agent_id, step_index, error_type, error_message, will_retry
# === Token streaming ===
StreamEventType.TOKEN
# Fields: agent_id, token (str), token_index
# === Adaptive execution ===
StreamEventType.TOPOLOGY_CHANGED
# Fields: reason, old_plan, new_plan, remaining_steps
StreamEventType.PRUNE
# Fields: agent_id, reason (low_weight/low_probability/budget/quality)
StreamEventType.FALLBACK
# Fields: original_agent, fallback_agent, reason, attempt
# === Parallel execution ===
StreamEventType.PARALLEL_START
# Fields: group_agents (list), group_index
StreamEventType.PARALLEL_END
# Fields: group_agents, completed_count, failed_count, duration_ms
# === Budget ===
StreamEventType.BUDGET_WARNING
# Fields: budget_type (tokens/requests/time), current, limit, ratio
StreamEventType.BUDGET_EXCEEDED
# Fields: budget_type, current, limit, action_taken
# === Memory ===
StreamEventType.MEMORY_WRITE
# Fields: agent_id, memory_level (working/long_term), entry_key
StreamEventType.MEMORY_READ
# Fields: agent_id, memory_level, entry_key, found
StreamEventType.MEMORY_PROMOTED
# Fields: agent_id, entry_key, from_level, to_level
# === Metrics ===
StreamEventType.METRICS_UPDATE
# Fields: agent_id, metrics (dict with reliability, latency, quality, cost)
# Example: handling all event types
for event in runner.stream(graph):
match event.event_type:
case StreamEventType.RUN_START:
print(f"Starting run {event.run_id} with {event.num_agents} agents")
case StreamEventType.AGENT_START:
print(f"Agent {event.agent_id} starting (step {event.step_index})")
case StreamEventType.AGENT_OUTPUT:
print(f"Agent {event.agent_id}: {event.content[:100]}...")
print(f" Tokens: {event.tokens_used}, Latency: {event.latency_ms}ms")
case StreamEventType.TOKEN:
print(event.token, end="", flush=True)
case StreamEventType.TOPOLOGY_CHANGED:
print(f"β³ Topology changed: {event.reason}")
print(f" New plan: {event.new_plan}")
case StreamEventType.PRUNE:
print(f"β Pruned {event.agent_id}: {event.reason}")
case StreamEventType.FALLBACK:
print(f"β€· Fallback: {event.original_agent} β {event.fallback_agent}")
case StreamEventType.PARALLEL_START:
print(f"β«Έ Starting parallel group: {event.group_agents}")
case StreamEventType.PARALLEL_END:
print(f"β«· Parallel group done: {event.completed_count}/{len(event.group_agents)}")
case StreamEventType.BUDGET_WARNING:
print(f"β Budget warning: {event.budget_type} at {event.ratio:.1%}")
case StreamEventType.BUDGET_EXCEEDED:
print(f"β Budget exceeded: {event.budget_type}")
case StreamEventType.RUN_END:
print(f"β Execution completed in {event.total_time:.2f}s")
print(f" Total tokens: {event.total_tokens}")
print(f" Final answer: {event.final_answer[:100]}...")
```
---
## Advanced Features
### Execution optimization and token savings
The framework provides several mechanisms to optimize execution and reduce token usage:
#### 1. Filtering isolated nodes
Automatically exclude nodes that are not on the path from start to end:
```python
# Set execution bounds
graph.set_execution_bounds("input", "output")
# Filter isolated nodes during execution
result = runner.run_round(
graph,
filter_unreachable=True # Exclude nodes not on the input->output path
)
# Nodes unrelated to the input->output path will not be executed
print(f"Agents excluded: {len(result.pruned_agents or [])}")
```
**Example:**
```python
builder = GraphBuilder()
builder.add_agent("a1")
builder.add_agent("a2")
builder.add_agent("a3")
builder.add_agent("isolated") # Not connected to a1->a3
builder.add_workflow_edge("a1", "a2")
builder.add_workflow_edge("a2", "a3")
builder.set_execution_bounds("a1", "a3")
graph = builder.build()
# Reachability analysis
relevant = graph.get_relevant_nodes() # {"a1", "a2", "a3"}
isolated = graph.get_isolated_nodes() # {"isolated"}
result = runner.run_round(graph, filter_unreachable=True)
# "isolated" will not run β token savings
```
#### 2. Node deactivation (Disabled Nodes)
Temporarily deactivate nodes without removing them from the graph:
```python
# Deactivate based on metrics/RL
if quality_score < threshold:
graph.disable("expensive_agent")
# Or multiple nodes
graph.disable(["agent1", "agent2"])
# Check
if graph.is_enabled("agent1"):
...
# Re-enable
graph.enable("agent1")
graph.enable() # All
result = runner.run_round(graph)
# Deactivated nodes appear in result.pruned_agents
```
**Use case: RL control**
```python
# An RL agent decides which nodes to deactivate
for agent_id in graph.node_ids:
rl_score = rl_model.predict(graph_state, agent_id)
if rl_score < 0.3:
graph.disable(agent_id)
result = runner.run_round(graph)
```
#### 3. Early stopping
Stop execution when a condition is met:
```python
from execution import EarlyStopCondition, RunnerConfig
# By keyword
stop1 = EarlyStopCondition.on_keyword("FINAL ANSWER")
# By token limit
stop2 = EarlyStopCondition.on_token_limit(5000)
# By number of agents
stop3 = EarlyStopCondition.on_agent_count(3)
# By metadata (for RL/metrics)
stop4 = EarlyStopCondition.on_metadata(
"quality", 0.95,
comparator=lambda v, t: v > t
)
# Custom logic
stop5 = EarlyStopCondition.on_custom(
lambda ctx: my_evaluator.is_done(ctx.messages),
reason="Evaluator decided task is done",
min_agents_executed=2 # At least 2 agents before checking
)
# Combination (OR)
stop_any = EarlyStopCondition.combine_any([
EarlyStopCondition.on_keyword("DONE"),
EarlyStopCondition.on_token_limit(10000),
])
config = RunnerConfig(
early_stop_conditions=[stop1, stop2, stop5]
)
runner = MACPRunner(llm_caller=my_llm, config=config)
result = runner.run_round(graph)
if result.early_stopped:
print(f"Reason: {result.early_stop_reason}")
saved = len(graph.node_ids) - len(result.execution_order)
print(f"Agents saved: {saved}")
```
#### 4. Runtime topology (Topology Hooks)
Modify the graph **during execution** based on intermediate results:
```python
from execution import TopologyAction, StepContext
def adaptive_topology(ctx: StepContext, graph) -> TopologyAction:
"""Hook is called after each agent."""
# ctx.agent_id β current agent
# ctx.response β its response
# ctx.messages β all responses
# ctx.execution_order β execution order
# ctx.remaining_agents β remaining agents
# ctx.total_tokens β tokens used
# Add an edge if review is needed
if "uncertain" in (ctx.response or "").lower():
return TopologyAction(
add_edges=[(ctx.agent_id, "reviewer", 1.0)],
trigger_rebuild=True
)
# Remove an edge
if confident:
return TopologyAction(
remove_edges=[("agent1", "checker")]
)
# Skip agents
if ctx.total_tokens > 8000:
return TopologyAction(
skip_agents=["expensive_agent"]
)
# Early stop
if "DONE" in (ctx.response or ""):
return TopologyAction(
early_stop=True,
early_stop_reason="Task completed"
)
return None
config = RunnerConfig(
enable_dynamic_topology=True,
topology_hooks=[adaptive_topology]
)
```
#### 5. Combined optimization
Use all mechanisms together for maximum optimization:
```python
from execution import (
GraphBuilder, MACPRunner, RunnerConfig,
EarlyStopCondition, TopologyAction, StepContext
)
# Build a graph
builder = GraphBuilder()
builder.add_agent("input")
builder.add_agent("solver")
builder.add_agent("checker")
builder.add_agent("expert") # Expensive agent
builder.add_agent("formatter")
builder.add_agent("optional") # Optional
builder.add_workflow_edge("input", "solver")
builder.add_workflow_edge("solver", "checker")
builder.add_workflow_edge("checker", "formatter")
# Set execution bounds
builder.set_execution_bounds("input", "formatter")
graph = builder.build()
# Disable optional nodes
graph.disable("optional")
# Adaptation hooks
def smart_topology(ctx: StepContext, graph) -> TopologyAction:
# If solver is confident β skip checker
if ctx.agent_id == "solver" and ctx.metadata.get("confidence", 0) > 0.95:
return TopologyAction(skip_agents=["checker"])
# If checker found an issue β add expert
if ctx.agent_id == "checker" and "ERROR" in (ctx.response or ""):
return TopologyAction(
add_edges=[("checker", "expert", 1.0), ("expert", "formatter", 1.0)],
trigger_rebuild=True
)
return None
# Configure runner with optimization
config = RunnerConfig(
adaptive=True,
enable_dynamic_topology=True,
topology_hooks=[smart_topology],
early_stop_conditions=[
EarlyStopCondition.on_keyword("FINAL_ANSWER"),
EarlyStopCondition.on_token_limit(10000),
],
pruning_config=PruningConfig(token_budget=15000),
)
runner = MACPRunner(llm_caller=my_llm, config=config)
result = runner.run_round(
graph,
filter_unreachable=True # Exclude isolated nodes
)
# Optimization analysis
print(f"Agents executed: {len(result.execution_order)}")
print(f"Pruned: {len(result.pruned_agents or [])}")
print(f"Early stopped: {result.early_stopped}")
print(f"Modifications: {result.topology_modifications}")
print(f"Tokens: {result.total_tokens}")
```
---
### Multi-Model Support (Multi-Model Support)
Each agent in the graph can use its own LLM model with individual settings. This makes it possible to:
- **Optimize costs** β use expensive models only for complex tasks
- **Balance performance** β fast models for simple operations
- **Specialize agents** β models trained for specific domains
- **Hybrid solutions** β combine cloud and local models
#### Multi-model architecture
```
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β TASK NODE β
β "Analyze the market" β
ββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββββββββ
β
ββββββββββ΄βββββββββ
βΌ βΌ
βββββββββββββββββ βββββββββββββββββ
β ANALYST β β COORDINATOR β
β ββββΆβ β
β GPT-4 β β GPT-4o-mini β
β temp: 0.0 β β temp: 0.3 β
β tokens: 4000 β β tokens: 1000 β
βββββββββββββββββ βββββββββββββββββ
```
---
#### Key components
**1. LLMConfig** β an agentβs LLM configuration
```python
from core.schema import LLMConfig
llm_config = LLMConfig(
model_name="gpt-4", # Model name
base_url="https://api.openai.com/v1", # API endpoint
api_key="$OPENAI_API_KEY", # Key (or $ENV_VAR)
max_tokens=2000, # Max tokens in the response
temperature=0.7, # Generation temperature
timeout=60.0, # Request timeout
top_p=0.9, # Nucleus sampling
stop_sequences=["END"], # Stop sequences
)
# Validate configuration
if llm_config.is_configured():
params = llm_config.to_generation_params()
print(f"Generation params: {params}")
# Merge configurations (fallback)
default_config = LLMConfig(model_name="gpt-4o-mini", temperature=0.5)
final_config = llm_config.merge_with(default_config)
```
**2. AgentLLMConfig** β an immutable configuration for AgentProfile
```python
from core.agent import AgentLLMConfig
agent_llm_config = AgentLLMConfig(
model_name="gpt-4",
base_url="https://api.openai.com/v1",
api_key="sk-...",
temperature=0.7,
max_tokens=2000,
)
# Convert to LLMConfig
llm_config = agent_llm_config.to_llm_config()
```
**3. LLMCallerFactory** β a factory for creating LLM callers
```python
from execution import LLMCallerFactory
# Create a factory for OpenAI-compatible APIs
factory = LLMCallerFactory.create_openai_factory(
default_model="gpt-4o-mini",
default_base_url="https://api.openai.com/v1",
default_api_key="sk-...",
default_temperature=0.7,
default_max_tokens=2000,
)
# The factory automatically creates callers based on AgentLLMConfig
# when used with MACPRunner
```
**4. Caller factory helpers**
Three ready-made functions cover the most common setups:
| Function | Interface | Use with |
|---|---|---|
| `create_openai_caller()` | `(str) -> str` | Legacy `llm_caller` |
| `create_openai_structured_caller()` | `(list[dict]) -> str` | `structured_llm_caller` β
**recommended** |
| `create_openai_async_structured_caller()` | `async (list[dict]) -> str` | `async_structured_llm_caller` β
parallel |
```python
from execution import (
create_openai_caller,
create_openai_structured_caller,
create_openai_async_structured_caller,
)
# ββ Legacy flat-string caller ββββββββββββββββββββββββββββββββββββββββββββββββ
caller = create_openai_caller(
model="gpt-4",
base_url="https://api.openai.com/v1",
api_key="sk-...",
temperature=0.7,
max_tokens=2000,
)
response = caller("What is 2+2?") # (str) -> str
# ββ Structured sync caller (recommended for chat LLMs) ββββββββββββββββββββββ
sync_caller = create_openai_structured_caller(
api_key="sk-...",
model="gpt-4o",
temperature=0.7,
max_tokens=1024,
)
# Use as: MACPRunner(structured_llm_caller=sync_caller)
# ββ Structured async caller (required for parallel astream) βββββββββββββββββ
async_caller = create_openai_async_structured_caller(
api_key="sk-...",
model="gpt-4o",
temperature=0.7,
max_tokens=1024,
)
# Use as: MACPRunner(async_structured_llm_caller=async_caller)
# ββ Full parallel setup ββββββββββββββββββββββββββββββββββββββββββββββββββββββ
from execution import MACPRunner, RunnerConfig
runner = MACPRunner(
structured_llm_caller=sync_caller,
async_structured_llm_caller=async_caller,
config=RunnerConfig(enable_parallel=True),
)
# Sequential graphs β stream() uses sync_caller
for event in runner.stream(graph):
...
# Parallel graphs β astream() uses async_caller for concurrent groups
import asyncio
async def run():
async for event in runner.astream(graph):
...
asyncio.run(run())
```
---
#### Ways to configure multi-model support
##### Method 1: Via GraphBuilder (recommended)
```python
from builder import GraphBuilder
from execution import MACPRunner, LLMCallerFactory
builder = GraphBuilder()
# Agent 1: strong model for analysis
builder.add_agent(
agent_id="analyst",
display_name="Senior Analyst",
persona="Expert data analyst with deep domain knowledge",
llm_backbone="gpt-4", # Or model_name
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
temperature=0.0, # Strict analysis
max_tokens=4000,
timeout=120.0,
)
# Agent 2: weaker model for formatting
builder.add_agent(
agent_id="formatter",
display_name="Report Formatter",
persona="Formats data into readable reports",
llm_backbone="gpt-4o-mini",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
temperature=0.3,
max_tokens=1000,
timeout=30.0,
)
# Agent 3: local model for confidential data
builder.add_agent(
agent_id="privacy_checker",
display_name="Privacy Checker",
llm_backbone="llama3:70b",
base_url="http://localhost:11434/v1", # Ollama
api_key="not-needed",
temperature=0.1,
max_tokens=500,
)
builder.add_workflow_edge("analyst", "formatter")
builder.add_workflow_edge("analyst", "privacy_checker")
graph = builder.build()
# The factory will automatically create callers for each agent
factory = LLMCallerFactory.create_openai_factory()
runner = MACPRunner(llm_factory=factory)
result = runner.run_round(graph)
print(f"Final answer: {result.final_answer}")
```
##### Method 2: Explicit LLMConfig
```python
from core.schema import LLMConfig
# Predefined configurations
gpt4_config = LLMConfig(
model_name="gpt-4",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
temperature=0.7,
max_tokens=2000,
)
gpt4_mini_config = LLMConfig(
model_name="gpt-4o-mini",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
temperature=0.5,
max_tokens=1000,
)
builder = GraphBuilder()
builder.add_agent(
"researcher",
display_name="Researcher",
llm_config=gpt4_config, # Pass a ready configuration
)
builder.add_agent(
"writer",
display_name="Writer",
llm_config=gpt4_mini_config,
)
graph = builder.build()
```
##### Method 3: llm_callers dictionary
```python
from execution import create_openai_caller
# Create callers manually
callers = {
"analyst": create_openai_caller(
model="gpt-4",
temperature=0.0,
max_tokens=4000,
),
"formatter": create_openai_caller(
model="gpt-4o-mini",
temperature=0.3,
max_tokens=1000,
),
"privacy_checker": create_openai_caller(
model="llama3:70b",
base_url="http://localhost:11434/v1",
api_key="not-needed",
),
}
# Pass directly into the runner
runner = MACPRunner(llm_callers=callers)
result = runner.run_round(graph)
```
##### Method 4: Combined approach
```python
# Use the factory as default, but override for some agents
factory = LLMCallerFactory.create_openai_factory(
default_model="gpt-4o-mini", # Default
)
# Create a custom caller for a specific agent
specialized_caller = create_openai_caller(
model="gpt-4",
temperature=0.0,
max_tokens=4000,
)
runner = MACPRunner(
llm_factory=factory, # For all agents
llm_callers={"analyst": specialized_caller}, # Override for analyst
)
```
---
#### LLM caller resolution priority
```
1. llm_callers[agent_id] β Explicitly provided caller
β
2. llm_factory.get_caller() β Factory creates based on agent.llm_config
β
3. llm_caller β Default caller for all agents
β
4. Exception β Error: no caller specified
```
---
#### Usage examples
##### Example 1: Cost optimization
```python
# Cheap model for routine operations, expensive one for complex tasks
builder = GraphBuilder()
# 5 simple analysts (cheap model)
for i in range(5):
builder.add_agent(
f"analyst_{i}",
display_name=f"Junior Analyst {i}",
llm_backbone="gpt-4o-mini",
temperature=0.3,
max_tokens=500,
)
builder.add_workflow_edge(f"analyst_{i}", "senior")
# 1 senior analyst (expensive model)
builder.add_agent(
"senior",
display_name="Senior Analyst",
llm_backbone="gpt-4",
temperature=0.7,
max_tokens=4000,
)
graph = builder.build()
# Savings: ~80% of tokens use the cheap model
```
##### Example 2: Hybrid solution (cloud + local model)
```python
builder = GraphBuilder()
# Public data β cloud model
builder.add_agent(
"public_analyzer",
llm_backbone="gpt-4",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
)
# Confidential data β local model
builder.add_agent(
"private_analyzer",
llm_backbone="llama3:70b",
base_url="http://localhost:11434/v1",
api_key="not-needed",
)
# Aggregator β cheap cloud model
builder.add_agent(
"aggregator",
llm_backbone="gpt-4o-mini",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
)
builder.add_workflow_edge("public_analyzer", "aggregator")
builder.add_workflow_edge("private_analyzer", "aggregator")
graph = builder.build()
```
##### Example 3: Specialized models
```python
builder = GraphBuilder()
# Medical expert β a model trained on medical data
builder.add_agent(
"medical_expert",
llm_backbone="medical-llm-v2",
base_url="https://medical-api.example.com/v1",
api_key="$MEDICAL_API_KEY",
temperature=0.0, # Strict medical recommendations
)
# Legal expert β a model trained on legal texts
builder.add_agent(
"legal_expert",
llm_backbone="legal-llm-v3",
base_url="https://legal-api.example.com/v1",
api_key="$LEGAL_API_KEY",
temperature=0.0,
)
# Coordinator β general model
builder.add_agent(
"coordinator",
llm_backbone="gpt-4",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
temperature=0.5,
)
builder.add_workflow_edge("medical_expert", "coordinator")
builder.add_workflow_edge("legal_expert", "coordinator")
graph = builder.build()
```
##### Example 4: Different temperatures for different styles
```python
builder = GraphBuilder()
# Creative writer (high temperature)
builder.add_agent(
"creative_writer",
llm_backbone="gpt-4",
temperature=0.9, # Creativity
max_tokens=2000,
)
# Strict editor (low temperature)
builder.add_agent(
"strict_editor",
llm_backbone="gpt-4",
temperature=0.1, # Precision
max_tokens=1500,
)
# Final formatter (medium temperature)
builder.add_agent(
"formatter",
llm_backbone="gpt-4o-mini",
temperature=0.5, # Balance
max_tokens=1000,
)
builder.add_workflow_edge("creative_writer", "strict_editor")
builder.add_workflow_edge("strict_editor", "formatter")
graph = builder.build()
```
---
#### Supported providers
The framework supports **any OpenAI-compatible API**:
| Provider | Base URL | Notes |
|----------|----------|-------|
| **OpenAI** | `https://api.openai.com/v1` | GPT-4, GPT-4o-mini, GPT-3.5-turbo |
| **Anthropic** | via wrapper | Claude (requires an adapter) |
| **Ollama** | `http://localhost:11434/v1` | Local models (llama3, mistral, etc.) |
| **vLLM** | custom | Self-hosted models |
| **LiteLLM** | custom | Unified API for all providers |
| **Azure OpenAI** | `https://<resource>.openai.azure.com/` | Azure-hosted models |
| **GigaChat** | custom | Sber models |
| **Cloudflare Tunnels** | custom | Via Cloudflare tunnels |
```python
# Examples for different providers
# OpenAI
builder.add_agent("agent1", llm_backbone="gpt-4",
base_url="https://api.openai.com/v1")
# Ollama (local)
builder.add_agent("agent2", llm_backbone="llama3:70b",
base_url="http://localhost:11434/v1")
# Azure OpenAI
builder.add_agent("agent3", llm_backbone="gpt-4",
base_url="https://myresource.openai.azure.com/")
# GigaChat
builder.add_agent("agent4", llm_backbone="GigaChat-Lightning",
base_url="https://gigachat-api.trycloudflare.com/v1")
# vLLM
builder.add_agent("agent5", llm_backbone="./models/Qwen3-80B",
base_url="https://my-vllm-server.com/v1")
```
---
#### Async and streaming support
```python
from execution import create_openai_caller
# Async caller per agent
async_callers = {
"agent1": create_openai_caller(model="gpt-4", is_async=True),
"agent2": create_openai_caller(model="gpt-4o-mini", is_async=True),
}
runner = MACPRunner(async_llm_callers=async_callers)
result = await runner.arun_round(graph)
# Streaming callers
streaming_callers = {
"agent1": create_openai_caller(model="gpt-4", is_streaming=True),
"agent2": create_openai_caller(model="gpt-4o-mini", is_streaming=True),
}
runner = MACPRunner(streaming_llm_callers=streaming_callers)
for event in runner.stream(graph):
if event.event_type == StreamEventType.TOKEN:
print(f"[{event.agent_id}] {event.token}", end="")
```
---
#### API key handling
```python
# 1. Direct
builder.add_agent("agent", api_key="sk-...")
# 2. From an environment variable (recommended)
builder.add_agent("agent", api_key="$OPENAI_API_KEY")
# When parsing, it is automatically resolved as os.getenv("OPENAI_API_KEY")
# 3. From a file
import os
os.environ["OPENAI_API_KEY"] = open("keys/openai.key").read().strip()
builder.add_agent("agent", api_key="$OPENAI_API_KEY")
```
---
#### Monitoring multi-model execution
```python
from core.metrics import MetricsTracker
tracker = MetricsTracker()
runner = MACPRunner(
llm_factory=factory,
metrics_tracker=tracker,
)
result = runner.run_round(graph)
# Per-model analysis
for agent_id in graph.node_ids:
agent = graph.get_agent_by_id(agent_id)
model = agent.llm_config.model_name if agent.llm_config else "default"
metrics = tracker.get_node_metrics(agent_id)
print(f"\n{agent_id} ({model}):")
print(f" Latency: {metrics.avg_latency_ms:.0f}ms")
print(f" Tokens: {metrics.total_cost_tokens}")
print(f" Reliability: {metrics.reliability:.2%}")
```
---
#### Backward compatibility
Old code **continues to work** without changes:
```python
# Old approach (one LLM for all agents)
runner = MACPRunner(llm_caller=my_llm)
result = runner.run_round(graph)
# β
Works as before
# New approach (multi-model)
runner = MACPRunner(llm_factory=factory)
result = runner.run_round(graph)
# β
Uses per-agent models
```
---
### Structured Prompt β modern chat LLMs (recommended)
> **TL;DR** β if you use OpenAI, GigaChat, Anthropic, or any other
> chat-completion API, pass `structured_llm_caller` instead of the
> legacy `llm_caller`. The runner will send proper `system` / `user`
> roles to the LLM instead of one flat string. This produces shorter,
> more focused responses and saves tokens β especially in long agent chains.
#### The problem with the legacy `llm_caller`
The classic `llm_caller: Callable[[str], str]` interface passes the entire
prompt as a **single flat string**, combining persona, description, task and
messages from other agents:
```
"You are a mathematician.\n\nSolve step by step.\n\nTask: ...\n\nMessages from other agents:\n..."
```
Modern chat LLMs (OpenAI GPT-4, GigaChat, Claude, Geminiβ¦) expect messages
to be split into **roles** (`system`, `user`, `assistant`). When everything
arrives in one blob the model has to re-parse it, which leads to:
- π΄ **Verbose, padded responses** β the model does not know how strictly to
follow the system instruction
- π΄ **Token accumulation** β long chains accumulate more and more context
- π΄ **Lower instruction-following quality** β especially for role-specific behaviour
#### The fix: `structured_llm_caller`
`MACPRunner` now supports a second caller interface that receives a
`list[dict[str, str]]` β exactly what the OpenAI chat completions API expects:
The full message list produced by `_build_prompt` is:
```python
[
# 1. system β persona, description, tools hint, output_schema instruction
{"role": "system", "content": "You are a mathematician. Solve step by step.\n\nAvailable tools: calculator.\n\nRespond with JSON matching: {\"type\":\"object\",...}"},
# 2..N-1. agent.state β previous conversation turns replayed with correct roles
{"role": "assistant", "content": "Previous answer turn 1β¦"},
{"role": "user", "content": "Follow-up question turn 2β¦"},
# β¦ (as many entries as agent.state contains)
# N. user β current task, input_schema hint, memory context, incoming agent messages
{"role": "user", "content": "Task: 3xΒ² - 7x + 2 = 0\n\nInput format: {...}\n\nMessages from other agents: ..."},
]
```
The runner builds this automatically inside `_build_prompt` β `StructuredPrompt`
and dispatches via `_call_llm`. No parsing, no heuristics, no hacks.
---
#### How it works internally
```
_build_prompt()
β
βββΊ StructuredPrompt
βββ .text β flat string (used by legacy llm_caller)
βββ .messages β list[dict] (used by structured_llm_caller)
MACPRunner._call_llm(caller, prompt)
βββ if structured_llm_caller is set β calls structured_llm_caller(prompt.messages)
βββ else β calls caller(prompt.text) # backward compat
```
Both representations are always built β switching between interfaces
requires **zero changes** to graph/agent code.
> **What goes where in `messages`:**
>
> | Source field | Role | Note |
> |---|---|---|
> | `persona` + `description` | `system` | Always first message |
> | tool names (`has_tools()`) | `system` | Appended to system content |
> | `output_schema` | `system` | `"Respond with JSON matching: β¦"` |
> | `agent.state` entries | `assistant`/`user` | Replayed in order between system and final user |
> | query + `input_schema` + memory + incoming msgs | `user` | Always last message |
---
#### Built-in factory helpers (recommended, zero boilerplate)
The framework ships ready-made factory functions so you don't need to write
any boilerplate caller code yourself:
```python
from execution import (
MACPRunner,
RunnerConfig,
create_openai_structured_caller, # sync β for stream() / run_round()
create_openai_async_structured_caller, # async β for astream() / arun_round()
)
# ββ Sequential graphs (chains, single agent) ββββββββββββββββββββββββββββββββ
runner = MACPRunner(
structured_llm_caller=create_openai_structured_caller(
api_key="sk-...",
base_url="https://api.openai.com/v1",
model="gpt-4o",
temperature=0.7,
max_tokens=1024,
),
)
for event in runner.stream(graph):
...
# ββ Parallel graphs (fan-in, fan-out) ββββββββββββββββββββββββββββββββββββββ
runner = MACPRunner(
structured_llm_caller=create_openai_structured_caller(
api_key="sk-...", model="gpt-4o"
),
async_structured_llm_caller=create_openai_async_structured_caller(
api_key="sk-...", model="gpt-4o"
),
config=RunnerConfig(enable_parallel=True),
)
async for event in runner.astream(graph):
...
```
> **Why two callers for parallel mode?** `stream()` is synchronous and
> uses `structured_llm_caller`. `astream()` with `enable_parallel=True`
> runs independent agents concurrently via `asyncio.gather` and therefore
> requires `async_structured_llm_caller`. For purely sequential graphs
> only the sync caller is needed.
---
#### Quick start (manual caller)
If you need custom logic (retries, logging, token tracking), write the
caller yourself β the interface is a simple function:
```python
from openai import OpenAI
from execution import MACPRunner, RunnerConfig
client = OpenAI(api_key="sk-...")
def my_structured_caller(messages: list[dict[str, str]]) -> str:
"""Drop-in replacement for any str->str llm_caller."""
resp = client.chat.completions.create(
model="gpt-4o",
messages=messages, # passed through as-is
max_tokens=1024,
temperature=0.7,
)
return resp.choices[0].message.content or ""
runner = MACPRunner(
structured_llm_caller=my_structured_caller,
config=RunnerConfig(timeout=60.0),
)
result = runner.run_round(graph)
print(result.final_answer)
```
#### Async variant (manual caller)
```python
import asyncio
from openai import AsyncOpenAI
aclient = AsyncOpenAI(api_key="sk-...")
async def my_async_structured_caller(messages: list[dict[str, str]]) -> str:
resp = await aclient.chat.completions.create(
model="gpt-4o",
messages=messages,
max_tokens=1024,
)
return resp.choices[0].message.content or ""
runner = MACPRunner(async_structured_llm_caller=my_async_structured_caller)
result = await runner.arun_round(graph)
```
---
#### Tracking tokens (benchmark pattern)
When you need to count tokens across many agents (e.g. for benchmarks), wrap
the OpenAI client to intercept `usage`:
```python
from openai import OpenAI
class TrackedLLM:
def __init__(self, api_key, base_url, model):
self._client = OpenAI(api_key=api_key, base_url=base_url)
self._model = model
self.total_tokens = 0
self.call_count = 0
def reset(self):
self.total_tokens = 0
self.call_count = 0
def chat(self, system: str, user: str, max_tokens: int = 1024) -> str:
messages = []
if system:
messages.append({"role": "system", "content": system})
messages.append({"role": "user", "content": user})
resp = self._client.chat.completions.create(
model=self._model, messages=messages,
temperature=0.7, max_tokens=max_tokens,
)
self.total_tokens += resp.usage.total_tokens if resp.usage else 0
self.call_count += 1
return resp.choices[0].message.content or ""
def as_structured_caller(self, max_tokens: int = 1024):
"""Return a structured_llm_caller for MACPRunner."""
def _caller(messages: list[dict[str, str]]) -> str:
system = next((m["content"] for m in messages if m["role"] == "system"), "")
user = next((m["content"] for m in messages if m["role"] == "user"), "")
return self.chat(system, user, max_tokens=max_tokens)
return _caller
llm = TrackedLLM(api_key="...", base_url="...", model="gpt-4o")
runner = MACPRunner(
structured_llm_caller=llm.as_structured_caller(max_tokens=1024),
)
result = runner.run_round(graph)
print(f"Tokens used: {llm.total_tokens}, calls: {llm.call_count}")
```
---
#### Caller priority
All caller types can coexist. The resolution priority is:
```
structured_llm_caller β Used for ALL plain agent calls when set
β
β (automatic strβstr wrapper also registered as llm_caller
β for internal checks β no code change needed)
βΌ
llm_callers[agent_id] β Per-agent override (always takes precedence)
βΌ
llm_factory β Factory by AgentLLMConfig
βΌ
llm_caller β Legacy default
```
You can mix `structured_llm_caller` (global default) with per-agent
`llm_callers` overrides β the structured caller will be used for all agents
that don't have an explicit override.
---
#### Providers comparison
| Provider | Recommended interface | Notes |
|---|---|---|
| **OpenAI** (GPT-4o, GPT-4, β¦) | `structured_llm_caller` β
| Native chat completions |
| **GigaChat / Sber** | `structured_llm_caller` β
| OpenAI-compatible API |
| **Anthropic Claude** | `structured_llm_caller` β
| Via adapter or LiteLLM |
| **Ollama** (local) | `structured_llm_caller` β
| OpenAI-compatible `/v1/chat/completions` |
| **vLLM** | `structured_llm_caller` β
| OpenAI-compatible server |
| **Azure OpenAI** | `structured_llm_caller` β
| Same API, different base URL |
| **Custom / non-chat API** | `llm_caller` (legacy) | Falls back to flat string |
---
#### Benchmark results (gMAS vs LangGraph)
The table below was measured with `examples/benchmark_vs_langgraph.py --runs 10`
using `structured_llm_caller`. LangGraph uses an equivalent explicit
`system` / `user` split on its side.
| Test topology | LangGraph time | gMAS time | Token Ξ |
|---|---|---|---|
| Single agent (1) | baseline | ~+10% | ~+10% |
| Chain of 3 (3) | baseline | **β18 %** | **β11 %** |
| Fan-in 2β1 (3) | baseline | **β30 %** | **β22 %** |
| Chain of 7 (7) | baseline | **β10 %** | **β17 %** |
| Fan-out 1β3β1 (5) | baseline | **β19 %** | **β13 %** |
> Single-agent test is slightly slower in gMAS due to protocol overhead;
> this overhead amortises quickly as the number of agents grows.
---
#### Migration from `llm_caller` to `structured_llm_caller`
No changes to graph or agent code are required. Only the runner
instantiation changes:
```python
# Before (legacy)
runner = MACPRunner(llm_caller=lambda prompt: my_model(prompt))
# After (recommended)
runner = MACPRunner(
structured_llm_caller=lambda messages: my_model_chat(messages)
)
```
Both interfaces are fully supported. The legacy `llm_caller` is not
deprecated and will not be removed.
---
### Dynamic Topology
#### Static graph modification
Modify the graph structure before execution:
```python
# Add a new agent
new_agent = AgentProfile(agent_id="expert", display_name="Expert")
graph.add_node(new_agent, connections_to=["checker"])
# Change connections
graph.add_edge("solver", "expert", weight=0.9)
graph.remove_edge("solver", "checker")
# Disable nodes (without deletion)
graph.disable("expensive_agent") # Will not run, but remains in the graph
# Full topology update from a matrix
import torch
new_adjacency = torch.tensor([
[0, 1, 0],
[0, 0, 1],
[0, 0, 0],
], dtype=torch.float32)
graph.update_communication(
new_adjacency,
s_tilde=score_matrix, # Connection quality scores
p_matrix=probability_matrix # Transition probabilities
)
```
#### Runtime modification (during execution)
A powerful feature for modifying the graph **during a round** based on intermediate results:
##### Early stopping (Early Stopping)
```python
from execution import EarlyStopCondition, RunnerConfig
# 1. By keyword in the response
stop_on_answer = EarlyStopCondition.on_keyword(
"FINAL ANSWER",
reason="Answer found"
)
# 2. By token limit
stop_on_tokens = EarlyStopCondition.on_token_limit(
max_tokens=5000,
reason="Token budget exceeded"
)
# 3. By number of executed agents
stop_on_count = EarlyStopCondition.on_agent_count(
max_agents=5,
reason="Sufficient agents executed"
)
# 4. By a metadata value (for RL, metrics)
stop_on_quality = EarlyStopCondition.on_metadata(
"quality_score",
0.95,
comparator=lambda v, threshold: v > threshold,
reason="Quality threshold reached"
)
# 5. Custom condition
stop_custom = EarlyStopCondition.on_custom(
condition=lambda ctx: my_rl_agent.should_stop(ctx.messages),
reason="RL agent decided to stop",
min_agents_executed=2 # At least 2 agents before checking
)
# 6. Combine conditions (OR)
stop_any = EarlyStopCondition.combine_any([
EarlyStopCondition.on_keyword("DONE"),
EarlyStopCondition.on_token_limit(10000),
stop_on_quality,
])
# 7. Combine conditions (AND)
stop_all = EarlyStopCondition.combine_all([
EarlyStopCondition.on_keyword("answer"),
stop_on_quality,
])
# Usage
config = RunnerConfig(
early_stop_conditions=[stop_on_answer, stop_on_tokens]
)
runner = MACPRunner(llm_caller=my_llm, config=config)
result = runner.run_round(graph)
if result.early_stopped:
print(f"Stopped: {result.early_stop_reason}")
print(f"Saved: {len(graph.node_ids) - len(result.execution_order)} agents")
```
##### Topology Hooks (on-the-fly graph modification)
```python
from execution import TopologyAction, StepContext, RunnerConfig
def my_topology_hook(ctx: StepContext, graph) -> TopologyAction:
"""Called after each execution step.
StepContext contains:
- agent_id: current agent
- response: its response
- messages: all responses so far
- execution_order: execution order
- remaining_agents: remaining agents
- total_tokens: tokens used
- metadata: arbitrary data
"""
# 1. Early stopping based on custom logic
if "TASK_COMPLETE" in (ctx.response or ""):
return TopologyAction(
early_stop=True,
early_stop_reason="Task marked as complete"
)
# 2. Add an edge if quality is low
if ctx.metadata.get("quality", 1.0) < 0.5:
return TopologyAction(
add_edges=[
(ctx.agent_id, "reviewer_agent", 1.0),
],
trigger_rebuild=True # Re-plan remaining steps
)
# 3. Remove an edge
if some_condition:
return TopologyAction(
remove_edges=[
("agent1", "agent2"),
]
)
# 4. Skip upcoming agents
if ctx.total_tokens > 8000:
return TopologyAction(
skip_agents=["expensive_agent1", "expensive_agent2"]
)
# 5. Force execution of agents
if needs_expert_review:
return TopologyAction(
force_agents=["expert_reviewer"]
)
# 6. Change the final agent
if early_finish:
return TopologyAction(
new_end_agent="quick_finalizer"
)
return None # No changes
# Async hook for integration with RL, APIs, etc.
async def rl_topology_hook(ctx: StepContext, graph) -> TopologyAction:
"""Async hook for more complex logic."""
# You can call async APIs, RL models, etc.
decision = await my_rl_agent.get_topology_decision(
messages=ctx.messages,
graph_state=graph.to_dict()
)
if decision.add_connection:
return TopologyAction(
add_edges=[(decision.from_node, decision.to_node, decision.weight)]
)
return None
# Usage
config = RunnerConfig(
enable_dynamic_topology=True,
topology_hooks=[my_topology_hook],
async_topology_hooks=[rl_topology_hook],
)
runner = MACPRunner(llm_caller=my_llm, config=config)
result = runner.run_round(graph)
print(f"Topology modifications: {result.topology_modifications}")
```
##### Example: RL-controlled topology
```python
import torch
from your_rl_agent import RLAgent
class TopologyRL:
def __init__(self):
self.rl_agent = RLAgent()
def should_stop(self, ctx: StepContext) -> bool:
"""RL-agent decision for early stopping."""
state = self.encode_state(ctx)
action = self.rl_agent.predict(state)
return action == "STOP"
def get_topology_action(self, ctx: StepContext) -> TopologyAction | None:
"""RL agent decides how to change topology."""
state = self.encode_state(ctx)
action = self.rl_agent.predict(state)
if action == "ADD_REVIEWER":
return TopologyAction(
add_edges=[(ctx.agent_id, "reviewer", 1.0)],
trigger_rebuild=True
)
elif action == "SKIP_EXPENSIVE":
return TopologyAction(
skip_agents=["expensive_model"]
)
return None
def encode_state(self, ctx: StepContext) -> torch.Tensor:
# Encode state for RL
return torch.tensor([
len(ctx.messages),
ctx.total_tokens,
len(ctx.remaining_agents),
])
# Usage
rl_controller = TopologyRL()
config = RunnerConfig(
enable_dynamic_topology=True,
early_stop_conditions=[
EarlyStopCondition.on_custom(
rl_controller.should_stop,
reason="RL decided to stop"
)
],
topology_hooks=[rl_controller.get_topology_action],
)
```
##### Full example: adaptive system
```python
from execution import (
GraphBuilder, MACPRunner, RunnerConfig,
EarlyStopCondition, TopologyAction, StepContext
)
# Build the graph
builder = GraphBuilder()
builder.add_agent("input", persona="Input processor")
builder.add_agent("solver", persona="Problem solver")
builder.add_agent("checker", persona="Solution checker")
builder.add_agent("expensive_expert", persona="Expert (expensive)")
builder.add_agent("output", persona="Output formatter")
builder.add_workflow_edge("input", "solver")
builder.add_workflow_edge("solver", "checker")
builder.add_workflow_edge("checker", "output")
# expensive_expert is connected dynamically
builder.set_start_node("input")
builder.set_end_node("output")
builder.add_task(query="Solve the complex problem")
builder.connect_task_to_agents()
graph = builder.build()
# Hooks for adaptation
def adaptive_hook(ctx: StepContext, graph) -> TopologyAction:
# If checker found an issue β add expert
if ctx.agent_id == "checker" and "ERROR" in (ctx.response or ""):
return TopologyAction(
add_edges=[("checker", "expensive_expert", 1.0),
("expensive_expert", "output", 1.0)],
trigger_rebuild=True
)
# If solver produced a good answer β skip checker
if ctx.agent_id == "solver" and ctx.metadata.get("confidence", 0) > 0.95:
return TopologyAction(
skip_agents=["checker"],
reason="High confidence, skipping validation"
)
return None
# Configure runner
config = RunnerConfig(
adaptive=True,
enable_dynamic_topology=True,
topology_hooks=[adaptive_hook],
early_stop_conditions=[
EarlyStopCondition.on_keyword("FINAL_ANSWER"),
EarlyStopCondition.on_token_limit(10000),
],
)
runner = MACPRunner(llm_caller=my_llm, config=config)
result = runner.run_round(
graph,
filter_unreachable=True # Exclude isolated nodes
)
# Result
print(f"Executed: {result.execution_order}")
print(f"Early stopped: {result.early_stopped}")
print(f"Topology mods: {result.topology_modifications}")
print(f"Tokens saved: calculated from pruned_agents")
```
---
### GNN Routing (Graph Neural Networks for Routing)
Using graph neural networks for **learnable** optimal routing based on execution history.
#### Overview of GNN models
| Model | Description | When to use |
|------|-------------|-------------|
| **GCN** (Graph Convolutional Network) | Classic convolution for graphs | Homogeneous graphs, simple tasks |
| **GAT** (Graph Attention Network) | Uses an attention mechanism | Edge importance varies |
| **GraphSAGE** | Neighbor sampling for large graphs | Large graphs, inductive learning |
| **GIN** (Graph Isomorphism Network) | Maximally expressive architecture | Complex patterns, small graphs |
---
#### Full example: training a GNN router
```python
from core.gnn import (
create_gnn_router,
GNNTrainer,
GNNRouterInference,
GNNModelType,
TrainingConfig,
FeatureConfig,
RoutingStrategy,
DefaultFeatureGenerator,
)
from core.metrics import MetricsTracker
import torch
from torch_geometric.data import Data
# ========== STEP 1: Collect execution data ==========
tracker = MetricsTracker()
# Run multiple rounds to accumulate metrics
for i in range(100):
result = runner.run_round(graph)
# Record per-node metrics
for agent_id in result.execution_order:
response = result.messages[agent_id]
tracker.record_node_execution(
node_id=agent_id,
success=True,
latency_ms=response["latency"],
cost_tokens=response["tokens"],
quality=evaluate_quality(response["content"]),
)
# Record edge traversal metrics
for i, agent_id in enumerate(result.execution_order[:-1]):
next_agent = result.execution_order[i + 1]
tracker.record_edge_traversal(
source=agent_id,
target=next_agent,
weight=graph.get_edge_weight(agent_id, next_agent),
success=True,
latency_ms=50,
)
# ========== STEP 2: Feature generation ==========
feature_config = FeatureConfig(
include_degree=True, # Node degrees
include_centrality=True, # Centrality (betweenness, closeness)
include_embeddings=True, # Agent embeddings
include_metrics=True, # Performance metrics
include_structural=True, # Structural features (clustering coef)
normalize=True, # Feature normalization
)
feature_gen = DefaultFeatureGenerator(config=feature_config)
node_features = feature_gen.generate_node_features(
graph,
graph.node_ids,
tracker,
) # Shape: (num_nodes, feature_dim)
edge_features = feature_gen.generate_edge_features(
graph,
tracker,
) # Shape: (num_edges, edge_feature_dim)
print(f"Node features shape: {node_features.shape}")
print(f"Edge features shape: {edge_features.shape}")
# ========== STEP 3: Prepare the dataset ==========
# Create PyTorch Geometric Data objects
train_data_list = []
val_data_list = []
for sample in dataset: # Your dataset with execution history
data = Data(
x=sample['node_features'], # Node features
edge_index=sample['edge_index'], # Edge connections (2, E)
edge_attr=sample['edge_features'], # Edge features
y=sample['labels'], # Labels (optimal next node, quality score, etc.)
)
if sample['is_train']:
train_data_list.append(data)
else:
val_data_list.append(data)
# ========== STEP 4: Training configuration ==========
training_config = TrainingConfig(
# Hyperparameters
learning_rate=1e-3,
hidden_dim=64,
num_layers=3,
dropout=0.2,
# Training
epochs=100,
batch_size=32,
patience=10, # Early stopping
# Task
task="node_classification", # or "link_prediction", "graph_regression"
num_classes=2, # For classification
# Optimization
optimizer="adam", # adam, sgd, adamw
weight_decay=1e-5,
scheduler="reduce_on_plateau", # step, cosine, reduce_on_plateau
# Device
device="cuda" if torch.cuda.is_available() else "cpu",
# Logging
log_interval=10,
save_best=True,
)
# ========== STEP 5: Create the model ==========
# 5.1. GCN (Graph Convolutional Network)
model_gcn = create_gnn_router(
model_type=GNNModelType.GCN,
in_channels=node_features.shape[1],
out_channels=training_config.num_classes,
config=training_config,
)
# 5.2. GAT (Graph Attention Network)
model_gat = create_gnn_router(
model_type=GNNModelType.GAT,
in_channels=node_features.shape[1],
out_channels=training_config.num_classes,
config=training_config,
heads=4, # Number of attention heads
concat=True, # Concatenate heads or average
)
# 5.3. GraphSAGE
model_sage = create_gnn_router(
model_type=GNNModelType.GraphSAGE,
in_channels=node_features.shape[1],
out_channels=training_config.num_classes,
config=training_config,
aggr="mean", # mean, max, lstm
)
# 5.4. GIN (Graph Isomorphism Network)
model_gin = create_gnn_router(
model_type=GNNModelType.GIN,
in_channels=node_features.shape[1],
out_channels=training_config.num_classes,
config=training_config,
train_eps=True, # Trainable epsilon
)
# ========== STEP 6: Train ==========
trainer = GNNTrainer(model_gat, training_config)
training_result = trainer.train(
train_data_list,
val_data_list,
verbose=True,
)
print(f"Best validation accuracy: {training_result['best_val_acc']:.3f}")
print(f"Best epoch: {training_result['best_epoch']}")
print(f"Training time: {training_result['training_time']:.2f}s")
# Save the model
trainer.save("gnn_router.pt")
# Load the model
trainer.load("gnn_router.pt")
# ========== STEP 7: Inference ==========
router = GNNRouterInference(
model=model_gat,
feature_generator=feature_gen,
)
# 7.1. Predict the next node (node selection)
prediction = router.predict(
graph,
source="coordinator",
candidates=["researcher", "analyst", "writer"],
metrics_tracker=tracker,
strategy=RoutingStrategy.ARGMAX, # ARGMAX, TOP_K, SAMPLING, THRESHOLD
)
print(f"Recommended nodes: {prediction.recommended_nodes}")
print(f"Scores: {prediction.scores}")
print(f"Confidence: {prediction.confidence:.3f}")
# 7.2. Top-K prediction
prediction_topk = router.predict(
graph,
source="coordinator",
candidates=["a", "b", "c", "d"],
strategy=RoutingStrategy.TOP_K,
k=2, # Return top 2
)
print(f"Top 2: {prediction_topk.recommended_nodes}")
# 7.3. Probabilistic sampling
prediction_sample = router.predict(
graph,
source="coordinator",
candidates=candidates,
strategy=RoutingStrategy.SAMPLING,
temperature=0.8, # Sampling temperature
)
# 7.4. Threshold filtering
prediction_threshold = router.predict(
graph,
source="coordinator",
candidates=candidates,
strategy=RoutingStrategy.THRESHOLD,
threshold=0.7, # Only nodes with prob > 0.7
)
# ========== STEP 8: Integrate with AdaptiveScheduler ==========
from execution import AdaptiveScheduler, RoutingPolicy
scheduler = AdaptiveScheduler(
policy=RoutingPolicy.GNN_BASED,
gnn_router=router,
gnn_threshold=0.6, # Min confidence to use the GNN
fallback_policy=RoutingPolicy.WEIGHTED_TOPO # Fallback on low confidence
)
plan = scheduler.build_plan(
graph.A_com,
graph.node_ids,
metrics_tracker=tracker,
)
# ========== STEP 9: Monitoring and fine-tuning ==========
# Collect new data after deployment
new_data = []
for i in range(20):
result = runner.run_round(graph)
# ... record data ...
new_data.append(create_data_sample(result))
# Fine-tune
trainer.fine_tune(
new_data,
epochs=10,
learning_rate=1e-4,
)
trainer.save("gnn_router_finetuned.pt")
# ========== Evaluation ==========
from core.gnn import evaluate_router
metrics = evaluate_router(
router,
test_data_list,
metrics=["accuracy", "f1", "precision", "recall"],
)
print(f"Test accuracy: {metrics['accuracy']:.3f}")
print(f"F1 score: {metrics['f1']:.3f}")
```
---
#### Comparing GNN models
```python
# Experiment: compare performance across models
models = {
"GCN": create_gnn_router(GNNModelType.GCN, in_channels, out_channels, config),
"GAT": create_gnn_router(GNNModelType.GAT, in_channels, out_channels, config),
"GraphSAGE": create_gnn_router(GNNModelType.GraphSAGE, in_channels, out_channels, config),
"GIN": create_gnn_router(GNNModelType.GIN, in_channels, out_channels, config),
}
results = {}
for name, model in models.items():
trainer = GNNTrainer(model, training_config)
result = trainer.train(train_data, val_data)
results[name] = result
# Comparison
import pandas as pd
df = pd.DataFrame([
{
"Model": name,
"Val Acc": res["best_val_acc"],
"Train Time": res["training_time"],
"Params": sum(p.numel() for p in models[name].parameters()),
}
for name, res in results.items()
])
print(df)
# Output:
# | Model | Val Acc | Train Time | Params |
# |-----------|---------|------------|---------|
# | GCN | 0.853 | 12.5s | 45123 |
# | GAT | 0.891 | 18.3s | 67891 |
# | GraphSAGE | 0.874 | 15.2s | 52341 |
# | GIN | 0.867 | 14.8s | 48976 |
```
---
#### Production usage
```python
# Load a trained model
router = GNNRouterInference.load("gnn_router.pt", feature_gen)
# Integrate with the runner
config = RunnerConfig(
adaptive=True,
routing_policy=RoutingPolicy.GNN_BASED,
)
runner = MACPRunner(
llm_caller=my_llm,
config=config,
gnn_router=router,
metrics_tracker=tracker,
)
# Execute with GNN routing
result = runner.run_round(graph)
# Monitor GNN predictions
print(f"GNN predictions used: {result.gnn_prediction_count}")
print(f"Fallback to heuristic: {result.fallback_to_heuristic_count}")
```
---
### Hidden Channels
Hidden channels allow passing **implicit information** between agents as vector representations, bypassing text prompts. This is especially useful for:
- Passing contextual information without increasing prompt length
- Preserving semantic embeddings for downstream tasks
- Implementing attention mechanisms between agents
- Integrating with a GNN to predict next steps
#### Hidden channel architecture
```
βββββββββββββββ hidden_state βββββββββββββββ
β Agent A β ββββββββββββββββββ> β Agent B β
β (embedding) β embedding β (receives β
βββββββββββββββ β combined) β
βββββββββββββββ
```
Each agent owns its:
- **`embedding`** β vector representation of the agent description
- **`hidden_state`** β hidden state updated after execution
The runner combines predecessor `hidden_state` and `embedding` and passes them to the next agent.
#### Using hidden channels
```python
from execution import RunnerConfig, MACPRunner, HiddenState
from core import NodeEncoder
# 1. Create an encoder for embeddings
encoder = NodeEncoder(model_name="sentence-transformers/all-MiniLM-L6-v2")
# 2. Hidden-channel configuration
config = RunnerConfig(
enable_hidden_channels=True,
hidden_combine_strategy="mean", # Combine strategy
pass_embeddings=True, # Pass embeddings too
hidden_dim=384, # Hidden state dimensionality
)
runner = MACPRunner(llm_caller=my_llm, config=config)
# 3. Compute agent embeddings
texts = [agent.to_text() for agent in graph.agents]
embeddings = encoder.encode(texts)
for agent, emb in zip(graph.agents, embeddings):
agent = agent.with_embedding(emb)
graph.update_agent(agent.agent_id, agent)
# 4. Execute with hidden channels
result = runner.run_round_with_hidden(
graph,
hidden_encoder=encoder, # To create hidden_state from responses
)
# 5. Access hidden states after execution
for agent_id, hidden in result.hidden_states.items():
print(f"{agent_id}:")
print(f" Hidden state: {hidden.tensor.shape}") # (hidden_dim,)
print(f" Embedding: {hidden.embedding.shape}") # (embedding_dim,)
print(f" Combined: {hidden.combined.shape}") # (hidden_dim + embedding_dim,)
# 6. Use hidden states for downstream tasks
hidden_states_matrix = torch.stack([
result.hidden_states[aid].tensor for aid in graph.node_ids
]) # Shape: (num_agents, hidden_dim)
# For example, cluster agents by semantics
from sklearn.cluster import KMeans
kmeans = KMeans(n_clusters=3)
clusters = kmeans.fit_predict(hidden_states_matrix.cpu().numpy())
```
#### Combine strategies (combine_strategy)
When an agent has multiple predecessors, their hidden states are combined:
```python
# 1. "mean" β average (default)
# hidden_combined = mean([h1, h2, h3])
config.hidden_combine_strategy = "mean"
# 2. "sum" β sum
# hidden_combined = h1 + h2 + h3
config.hidden_combine_strategy = "sum"
# 3. "concat" β concatenation
# hidden_combined = concat([h1, h2, h3]) # dimensionality increases
config.hidden_combine_strategy = "concat"
# 4. "attention" β weighted attention (weights from adjacency)
# hidden_combined = w1*h1 + w2*h2 + w3*h3, where wi = edge_weight(i -> current)
config.hidden_combine_strategy = "attention"
# 5. "max" β elementwise max
# hidden_combined = max(h1, h2, h3)
config.hidden_combine_strategy = "max"
```
#### Advanced: custom hidden-state processing
```python
from utils.memory import HiddenChannel
# Create a custom HiddenChannel
channel = HiddenChannel(
node_id="agent_id",
hidden_dim=384,
)
# Set hidden state
import torch
channel.set_hidden(torch.randn(384))
channel.set_embedding(torch.randn(384))
# Get combined representation
combined = channel.get_combined(strategy="attention", edge_weights=torch.tensor([0.8, 0.2]))
# Reset
channel.reset()
# Integration with agent memory
from utils.memory import AgentMemory
memory = AgentMemory("agent_id")
memory.hidden_state = torch.randn(384)
memory.embedding = torch.randn(384)
# Get what to pass to the next agent
hidden_to_pass = memory.hidden_state
embedding_to_pass = memory.embedding
```
#### Using with a GNN
```python
from core.gnn import GNNRouterInference, DefaultFeatureGenerator
# 1. Hidden states as features for a GNN
feature_gen = DefaultFeatureGenerator()
# Include hidden states into node features
node_features = feature_gen.generate_node_features(
graph,
graph.node_ids,
metrics_tracker,
include_hidden_states=True, # Add hidden_state to features
)
# 2. GNN predicts the next agent based on hidden states
router = GNNRouterInference(model, feature_gen)
prediction = router.predict(
graph,
source="current_agent",
candidates=["next1", "next2"],
metrics_tracker=tracker,
hidden_states=result.hidden_states, # Pass current hidden states
)
# 3. Update the graph based on GNN predictions
if prediction.confidence > 0.8:
next_agent = prediction.recommended_nodes[0]
graph.add_edge("current_agent", next_agent, weight=prediction.confidence)
```
#### Example: multi-hop reasoning with hidden channels
```python
# Task: multi-hop reasoning where each agent accumulates context
agents = [
AgentProfile(agent_id="reader", display_name="Document Reader"),
AgentProfile(agent_id="analyzer", display_name="Analyzer"),
AgentProfile(agent_id="reasoner", display_name="Reasoner"),
AgentProfile(agent_id="answerer", display_name="Final Answerer"),
]
edges = [
("reader", "analyzer"),
("analyzer", "reasoner"),
("reasoner", "answerer"),
]
graph = build_property_graph(agents, edges, query="Complex question")
# Enable hidden channels for context passing
config = RunnerConfig(
enable_hidden_channels=True,
hidden_combine_strategy="attention",
pass_embeddings=True,
)
encoder = NodeEncoder(model_name="sentence-transformers/all-MiniLM-L6-v2")
runner = MACPRunner(llm_caller=my_llm, config=config)
result = runner.run_round_with_hidden(graph, hidden_encoder=encoder)
# After each step, hidden_state contains the "accumulated context"
# answerer receives a weighted combination of all previous hidden states
```
---
### Adaptive execution
Full control over adaptive execution:
```python
from execution import (
MACPRunner,
RunnerConfig,
RoutingPolicy,
PruningConfig,
BudgetConfig,
ErrorPolicy,
)
config = RunnerConfig(
adaptive=True,
enable_parallel=True,
max_parallel_size=5,
routing_policy=RoutingPolicy.BEAM_SEARCH,
pruning_config=PruningConfig(
min_weight_threshold=0.1,
token_budget=10000,
enable_fallback=True,
max_fallback_attempts=2,
quality_scorer=lambda response: evaluate_quality(response),
min_quality_threshold=0.5,
),
budget_config=BudgetConfig(
total_token_limit=50000,
max_prompt_length=4000,
node_token_limit=2000,
),
error_policy=ErrorPolicy(
on_timeout=ErrorAction.RETRY,
on_retry_exhausted=ErrorAction.PRUNE,
on_budget_exceeded=ErrorAction.ABORT,
),
)
runner = MACPRunner(llm_caller=my_llm, config=config)
result = runner.run_round(graph)
print(f"Topology changes: {result.topology_changed_count}")
print(f"Fallbacks: {result.fallback_count}")
print(f"Pruned agents: {result.pruned_agents}")
```
---
## Configuration
### Environment variables
```bash
# API key (required)
export RWXF_API_KEY="sk-your-api-key"
# or via file
export RWXF_API_KEY_FILE=/secure/rwxf.key
# LLM service URL
export RWXF_BASE_URL="https://api.openai.com/v1"
# Models
export RWXF_MODEL_NAME="gpt-4o-mini"
export RWXF_EMBEDDING_MODEL="sentence-transformers/all-MiniLM-L6-v2"
# Logging
export RWXF_LOG_LEVEL="INFO"
export RWXF_LOG_FILE="./logs/framework.log"
# Network settings
export RWXF_DEFAULT_TIMEOUT=60
export RWXF_MAX_RETRIES=3
```
### Programmatic configuration
```python
from config import FrameworkSettings, load_settings
# Load from environment
settings = FrameworkSettings()
# Load from a .env file
settings = load_settings(".env")
# Access settings
api_key = settings.resolved_api_key
model = settings.model_name
timeout = settings.default_timeout
```
---
## Usage examples
### Example 1: Simple pipeline
```python
from execution import AgentProfile, MACPRunner
from builder import build_property_graph
agents = [
AgentProfile(agent_id="researcher", display_name="Researcher"),
AgentProfile(agent_id="writer", display_name="Writer"),
AgentProfile(agent_id="editor", display_name="Editor"),
]
graph = build_property_graph(
agents,
workflow_edges=[("researcher", "writer"), ("writer", "editor")],
query="Write an article about quantum computers",
)
runner = MACPRunner(llm_caller=my_llm)
result = runner.run_round(graph)
print(result.final_answer)
```
### Example 2: Parallel processing
```python
# Agents work in parallel, then results are aggregated
agents = [
AgentProfile(agent_id="analyst_1", display_name="Financial Analyst"),
AgentProfile(agent_id="analyst_2", display_name="Market Analyst"),
AgentProfile(agent_id="analyst_3", display_name="Risk Analyst"),
AgentProfile(agent_id="aggregator", display_name="Report Aggregator"),
]
edges = [
("analyst_1", "aggregator"),
("analyst_2", "aggregator"),
("analyst_3", "aggregator"),
]
graph = build_property_graph(agents, workflow_edges=edges, query="Analyze company X")
config = RunnerConfig(
enable_parallel=True,
max_parallel_size=3,
)
runner = MACPRunner(llm_caller=my_llm, config=config)
result = await runner.arun_round(graph)
```
### Example 3: Streaming with a callback
```python
def on_event(event):
if event.event_type == StreamEventType.AGENT_OUTPUT:
save_to_db(event.agent_id, event.content)
notify_frontend(event)
runner = MACPRunner(llm_caller=my_llm)
for event in runner.stream(graph):
on_event(event)
if event.event_type == StreamEventType.TOKEN:
yield event.token # For SSE or WebSocket
```
### Example 4: Working with memory
```python
from execution import MACPRunner, RunnerConfig, MemoryConfig
config = RunnerConfig(
enable_memory=True,
memory_config=MemoryConfig(
working_max_entries=20,
long_term_max_entries=100,
),
memory_context_limit=5, # Include last 5 entries in the prompt
)
runner = MACPRunner(llm_caller=my_llm, config=config)
# First round
result1 = runner.run_round(graph)
# Second round β agents remember context
graph.query = "Continue the previous task"
result2 = runner.run_round(graph)
# Access agent memory
agent_memory = runner.get_agent_memory("solver")
entries = agent_memory.get_messages()
```
### Example 5: Graph visualization
```python
from core import AgentProfile
from core.visualization import (
GraphVisualizer,
VisualizationStyle,
MermaidDirection,
NodeStyle,
NodeShape,
# Convenience functions
to_mermaid,
to_ascii,
to_dot,
print_graph,
render_to_image,
)
from builder import build_property_graph
# Create a graph
agents = [
AgentProfile(
agent_id="input",
display_name="Input Handler",
tools=["api_reader"],
),
AgentProfile(
agent_id="processor",
display_name="Data Processor",
tools=["pandas", "torch"],
),
AgentProfile(
agent_id="output",
display_name="Output Formatter",
tools=["json", "csv"],
),
]
graph = build_property_graph(
agents,
workflow_edges=[("input", "processor"), ("processor", "output")],
query="Process data pipeline",
include_task_node=True,
)
# Option 1: Quick visualization (convenience functions)
print("=== MERMAID ===")
mermaid = to_mermaid(graph, direction=MermaidDirection.LEFT_RIGHT)
print(mermaid)
print("\n=== ASCII ===")
ascii_art = to_ascii(graph, show_edges=True)
print(ascii_art)
print("\n=== COLORED (if Rich is installed) ===")
print_graph(graph, format="auto") # Automatically chooses colored or ascii
# Option 2: Advanced visualization with custom styles (Pydantic models)
# Create a style (Pydantic model with validation)
custom_style = VisualizationStyle(
direction=MermaidDirection.LEFT_RIGHT,
agent_style=NodeStyle(
shape=NodeShape.ROUND,
fill_color="#e3f2fd",
stroke_color="#1976d2",
icon="π€",
),
task_style=NodeStyle(
shape=NodeShape.DIAMOND,
fill_color="#fff3e0",
stroke_color="#f57c00",
icon="π",
),
show_weights=True,
show_tools=True,
max_label_length=30,
)
# Create a visualizer with the custom style
viz = GraphVisualizer(graph, custom_style)
# Mermaid with a title
mermaid_styled = viz.to_mermaid(title="Data Pipeline")
print("\n=== STYLED MERMAID ===")
print(mermaid_styled)
# Save to files
viz.save_mermaid("pipeline.md", title="Data Pipeline") # Markdown with ```mermaid```
viz.save_dot("pipeline.dot", graph_name="DataPipeline")
# Render to images (requires system Graphviz)
try:
render_to_image(graph, "pipeline.png", format="png", dpi=150, style=custom_style)
render_to_image(graph, "pipeline.svg", format="svg", style=custom_style)
print("\nβ
Images created: pipeline.png, pipeline.svg")
except Exception as e:
print(f"\nβ οΈ Image rendering failed: {e}")
print(" Install system Graphviz to render images")
# Adjacency matrix (text representation)
print("\n=== ADJACENCY MATRIX ===")
matrix = viz.to_adjacency_matrix(show_labels=True)
print(matrix)
# Rich Console output with trees and tables
print("\n=== RICH CONSOLE ===")
viz.print_colored()
```
### Example 6: Conditional routing
```python
from builder import GraphBuilder
from execution.scheduler import ConditionContext
# Define conditions
def is_high_quality(context: ConditionContext) -> bool:
return context.state.get("quality", 0) > 0.8
def needs_review(context: ConditionContext) -> bool:
return context.state.get("word_count", 0) > 1000
# Build a graph with conditional edges
builder = GraphBuilder()
builder.add_agent(agent_id="writer", display_name="Content Writer")
builder.add_agent(agent_id="editor", display_name="Quick Editor")
builder.add_agent(agent_id="reviewer", display_name="Senior Reviewer")
builder.add_agent(agent_id="publisher", display_name="Publisher")
# Conditional transitions
builder.add_conditional_edge("writer", "editor", condition=is_high_quality)
builder.add_conditional_edge("writer", "reviewer", condition=needs_review)
builder.add_workflow_edge("editor", "publisher")
builder.add_workflow_edge("reviewer", "publisher")
graph = builder.build()
# Run
runner = MACPRunner(llm_caller=my_llm)
result = runner.run_round(graph)
```
### Example 7: Monitoring with events
```python
from core.events import (
global_event_bus,
EventType,
MetricsEventHandler,
)
# Configure event handlers
bus = global_event_bus()
metrics_handler = MetricsEventHandler()
# Subscribe to events
bus.subscribe(None, metrics_handler) # Listen to all events
@bus.subscribe(EventType.STEP_COMPLETED)
def on_step_completed(event):
print(f"β
{event.agent_id} completed in {event.duration_ms:.0f}ms")
@bus.subscribe(EventType.BUDGET_WARNING)
def on_budget_warning(event):
print(f"β οΈ Budget {event.budget_type}: {event.ratio:.1%}")
# Run with monitoring
runner = MACPRunner(llm_caller=my_llm)
result = runner.run_round(graph)
# Get aggregated metrics
metrics = metrics_handler.get_metrics()
print(f"Total tokens: {metrics['total_tokens']}")
print(f"Errors: {metrics['errors_count']}")
print(f"Avg step duration: {metrics['avg_step_duration_ms']:.1f}ms")
```
### Example 8: GNN routing with training
```python
from core.gnn import (
create_gnn_router,
GNNTrainer,
GNNRouterInference,
GNNModelType,
TrainingConfig,
DefaultFeatureGenerator,
)
from core.metrics import MetricsTracker
import torch
# Collect execution data for training
tracker = MetricsTracker()
# ... run several rounds with different queries ...
for i in range(100):
result = runner.run_round(graph)
# Record metrics
for agent_id, response in result.messages.items():
tracker.record_node_execution(
node_id=agent_id,
success=True,
latency_ms=response["latency"],
cost_tokens=response["tokens"],
quality=evaluate_quality(response["content"]),
)
# Feature generation
feature_gen = DefaultFeatureGenerator()
node_features = feature_gen.generate_node_features(
graph,
graph.node_ids,
tracker,
)
# Create dataset
# ... prepare train_data, val_data in PyG Data format ...
# Train the model
config = TrainingConfig(
learning_rate=1e-3,
hidden_dim=64,
num_layers=2,
epochs=50,
task="node_classification",
)
model = create_gnn_router(
model_type=GNNModelType.GAT,
in_channels=node_features.shape[1],
out_channels=2,
config=config,
)
trainer = GNNTrainer(model, config)
result = trainer.train(train_data, val_data)
print(f"Best validation accuracy: {result['best_val_acc']:.3f}")
trainer.save("gnn_router.pt")
# Use the trained model for routing
router = GNNRouterInference(model, feature_gen)
prediction = router.predict(
graph,
source="coordinator",
candidates=["agent1", "agent2", "agent3"],
metrics_tracker=tracker,
)
print(f"Recommended: {prediction.recommended_nodes[0]}")
print(f"Confidence: {prediction.confidence:.3f}")
```
### Example 9: Adaptive execution with a budget
```python
from execution import (
MACPRunner,
RunnerConfig,
RoutingPolicy,
PruningConfig,
)
from execution.budget import Budget
# Configure adaptive execution
config = RunnerConfig(
adaptive=True,
enable_parallel=True,
max_parallel_size=3,
routing_policy=RoutingPolicy.WEIGHTED_TOPO,
pruning_config=PruningConfig(
min_weight_threshold=0.1,
token_budget=5000,
enable_fallback=True,
max_fallback_attempts=2,
),
budget_config=BudgetConfig(
total_token_limit=10000,
node_token_limit=2000,
max_prompt_length=3000,
warn_at_usage_ratio=0.8,
),
timeout=60.0,
max_retries=2,
)
runner = MACPRunner(llm_caller=my_llm, config=config)
# Execute
try:
result = runner.run_round(graph)
print(f"Executed agents: {len(result.execution_order)}")
print(f"Pruned agents: {result.pruned_agents}")
print(f"Topology changes: {result.topology_changed_count}")
print(f"Fallback count: {result.fallback_count}")
print(f"Total tokens: {result.total_tokens}")
except BudgetExceededError as e:
print(f"Budget exceeded: {e}")
except ExecutionError as e:
print(f"Execution failed: {e}")
```
### Example 10: Graph analysis with algorithms
```python
from core.algorithms import (
GraphAlgorithms,
CentralityType,
PathMetric,
)
# Create a complex graph
algo = GraphAlgorithms(graph)
# Find critical nodes
centrality = algo.centrality(CentralityType.BETWEENNESS, normalized=True)
print(f"Most critical agents: {centrality.top_nodes[:3]}")
# Find alternative paths
paths = algo.k_shortest_paths(
source="input",
target="output",
k=3,
metric=PathMetric.WEIGHTED,
)
print(f"Found {len(paths)} alternative paths:")
for i, path in enumerate(paths, 1):
print(f" Path {i}: {' -> '.join(path.nodes)} (cost: {path.cost:.2f})")
# Detect communities
communities = algo.detect_communities(algorithm="louvain")
print(f"Communities found: {len(communities.communities)}")
for i, community in enumerate(communities.communities):
print(f" Community {i}: {community}")
# Cycle check
cycles = algo.find_cycles(max_length=5)
if cycles.has_cycles:
print(f"β οΈ Graph has {len(cycles.cycles)} cycles!")
else:
print("β Graph is acyclic (DAG)")
```
### Example 11: Multi-model system with cost optimization
```python
from builder import GraphBuilder
from execution import MACPRunner, LLMCallerFactory
# Build a graph with different models for different tasks
builder = GraphBuilder()
# Stage 1: Data collection (5 parallel agents, cheap model)
for i in range(5):
builder.add_agent(
f"collector_{i}",
display_name=f"Data Collector {i}",
persona="Collects and formats raw data",
llm_backbone="gpt-4o-mini",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
temperature=0.2,
max_tokens=500,
)
builder.add_workflow_edge(f"collector_{i}", "analyst")
# Stage 2: Deep analysis (1 agent, strong model)
builder.add_agent(
"analyst",
display_name="Senior Data Analyst",
persona="Expert analyst with deep statistical knowledge",
llm_backbone="gpt-4",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
temperature=0.0,
max_tokens=4000,
)
builder.add_workflow_edge("analyst", "privacy_checker")
# Stage 3: Privacy compliance check (local model)
builder.add_agent(
"privacy_checker",
display_name="Privacy Compliance Checker",
persona="Ensures data privacy and compliance",
llm_backbone="llama3:70b",
base_url="http://localhost:11434/v1",
api_key="not-needed",
temperature=0.0,
max_tokens=1000,
)
builder.add_workflow_edge("privacy_checker", "reporter")
# Stage 4: Report generation (cheap model)
builder.add_agent(
"reporter",
display_name="Report Generator",
persona="Formats analysis into readable reports",
llm_backbone="gpt-4o-mini",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
temperature=0.5,
max_tokens=2000,
)
builder.set_task(
query="Analyze Q4 sales data and generate a compliance report",
description="Full pipeline from data collection to the final report",
)
graph = builder.build()
# Print configuration
print("=== Multi-Model Pipeline Configuration ===\n")
for agent in graph.agents:
if hasattr(agent, 'llm_config') and agent.llm_config:
config = agent.llm_config
print(f"{agent.display_name}:")
print(f" Model: {config.model_name}")
print(f" Endpoint: {config.base_url}")
print(f" Temp: {config.temperature}, Max tokens: {config.max_tokens}")
print()
# Create factory and runner
factory = LLMCallerFactory.create_openai_factory()
config = RunnerConfig(
enable_parallel=True,
max_parallel_size=5, # Collectors run in parallel
timeout=120.0,
callbacks=[StdoutCallbackHandler()], # Execution monitoring
)
runner = MACPRunner(
llm_factory=factory,
config=config,
)
# Execute
print("=== Executing Multi-Model Pipeline ===\n")
result = runner.run_round(graph)
print(f"\n=== Results ===")
print(f"Execution order: {' β '.join(result.execution_order)}")
print(f"Total time: {result.total_time:.2f}s")
print(f"Total tokens: {result.total_tokens}")
print(f"\nFinal report:\n{result.final_answer}")
# Token usage analysis by model
from collections import defaultdict
costs_by_model = defaultdict(int)
for agent_id in result.execution_order:
agent = graph.get_agent_by_id(agent_id)
model = agent.llm_config.model_name if agent.llm_config else "default"
tokens = result.messages.get(agent_id, {}).get("tokens", 0)
costs_by_model[model] += tokens
print(f"\n=== Token Usage by Model ===")
for model, tokens in costs_by_model.items():
print(f"{model}: {tokens} tokens")
# Savings calculation
# gpt-4: $30/$60 per 1M tokens (input/output)
# gpt-4o-mini: $0.15/$0.60 per 1M tokens
# llama3 (local): $0
gpt4_tokens = costs_by_model.get("gpt-4", 0)
mini_tokens = costs_by_model.get("gpt-4o-mini", 0)
llama_tokens = costs_by_model.get("llama3:70b", 0)
actual_cost = (gpt4_tokens * 45 / 1_000_000) + (mini_tokens * 0.375 / 1_000_000)
if_all_gpt4_cost = (gpt4_tokens + mini_tokens + llama_tokens) * 45 / 1_000_000
print(f"\n=== Cost Analysis ===")
print(f"Actual cost: ${actual_cost:.4f}")
print(f"Cost if all GPT-4: ${if_all_gpt4_cost:.4f}")
print(f"Savings: ${if_all_gpt4_cost - actual_cost:.4f} ({((1 - actual_cost/if_all_gpt4_cost)*100):.1f}%)")
```
---
### Token budget (Budget System)
Resource management for execution (tokens, requests, time).
```python
from execution.budget import (
Budget,
BudgetConfig,
NodeBudget,
BudgetTracker,
)
# Budget β tracks a single resource (tokens, requests, or time)
token_budget = Budget(limit=50000)
print(f"Available: {token_budget.available}")
print(f"Usage ratio: {token_budget.usage_ratio:.1%}")
can_spend = token_budget.can_spend(100) # Check before using
token_budget.spend(100) # Record usage
# Per-node budget (composed of Budget objects)
node_budget = NodeBudget(
node_id="solver",
tokens=Budget(limit=2000),
requests=Budget(limit=10),
time_seconds=Budget(limit=60),
)
# Budget tracker β configured via BudgetConfig
config = BudgetConfig(
total_token_limit=50000, # Global token limit
total_request_limit=100, # Global request limit
total_time_limit_seconds=600, # Global time limit (10 min)
node_token_limit=2000, # Per-node token limit
max_prompt_length=4000, # Max chars in a prompt
max_response_length=2000, # Max chars in a response
warn_at_usage_ratio=0.8, # Warn at 80%
)
tracker = BudgetTracker(config=config)
tracker.start() # Start the timer
# Availability check
can_run, reason = tracker.can_execute("solver", estimated_tokens=100)
if can_run:
# Record usage after execution
tracker.record_usage(
node_id="solver",
prompt_tokens=80,
completion_tokens=120,
latency_seconds=1.5,
)
# Prompt/response truncation when exceeding limits
prompt = "a very long prompt..."
truncated = tracker.truncate_prompt(prompt)
# Budget summary
summary = tracker.get_summary()
print(f"Tokens used: {summary['global']['tokens']['used']}")
print(f"Time elapsed: {summary['global']['elapsed_seconds']:.1f}s")
# Reset
tracker.reset()
```
#### Integration with RunnerConfig
```python
from execution import RunnerConfig, BudgetConfig
config = RunnerConfig(
budget_config=BudgetConfig(
total_token_limit=50000,
node_token_limit=2000,
max_prompt_length=4000,
warn_at_usage_ratio=0.8,
),
)
```
---
### Error handling (Error Handling)
Structured exceptions and error-handling policies.
```python
from execution.errors import (
ExecutionError,
TimeoutError,
RetryExhaustedError,
BudgetExceededError,
AgentNotFoundError,
ValidationError,
ErrorPolicy,
ErrorAction,
ExecutionMetrics,
)
# Error policy
error_policy = ErrorPolicy(
on_timeout=ErrorAction.RETRY, # retry, skip, prune, fallback, rollback, abort
on_retry_exhausted=ErrorAction.PRUNE,
on_budget_exceeded=ErrorAction.ABORT,
on_validation_error=ErrorAction.ABORT,
on_agent_not_found=ErrorAction.SKIP,
on_unknown_error=ErrorAction.SKIP,
max_skipped_agents=5,
abort_on_critical_path=True,
)
# Apply in configuration
config = RunnerConfig(
error_policy=error_policy,
max_retries=3,
timeout=60.0,
)
# Error handling
try:
result = runner.run_round(graph)
except TimeoutError as e:
print(f"Timeout: {e}")
except RetryExhaustedError as e:
print(f"Retries exhausted: {e}")
except BudgetExceededError as e:
print(f"Budget exceeded: {e}")
except ExecutionError as e:
print(f"Execution error: {e}")
# Access metrics
metrics: ExecutionMetrics = e.metrics
print(f"Retries: {metrics.retry_count}")
print(f"Fallbacks: {metrics.fallback_count}")
# Get metrics from the result
if result.errors:
for error in result.errors:
print(f"{error['agent_id']}: {error['type']} - {error['message']}")
```
---
### Graph algorithms (Graph Algorithms)
A service layer for graph analysis using `rustworkx` algorithms.
```python
from core.algorithms import (
GraphAlgorithms,
CentralityType,
PathMetric,
SubgraphFilter,
)
algo = GraphAlgorithms(graph)
# K shortest paths
paths = algo.k_shortest_paths(
source="researcher",
target="writer",
k=3,
metric=PathMetric.HOP_COUNT, # HOP_COUNT, WEIGHTED, RELIABILITY
edge_weights=None, # or custom weights
)
for i, path in enumerate(paths):
print(f"Path {i+1}: {path.nodes} (cost={path.cost:.2f})")
# Node centrality
centrality = algo.centrality(
centrality_type=CentralityType.BETWEENNESS, # DEGREE, BETWEENNESS, CLOSENESS, EIGENVECTOR, PAGERANK
normalized=True,
)
print(f"Most central node: {centrality.top_nodes[0]}")
print(f"Scores: {centrality.scores}")
# Community detection
communities = algo.detect_communities(algorithm="louvain") # louvain, label_propagation
print(f"Communities found: {len(communities.communities)}")
print(f"Modularity: {communities.modularity:.3f}")
# Cycle search
cycles = algo.find_cycles(max_length=5)
if cycles.has_cycles:
print(f"Cycles found: {len(cycles.cycles)}")
for cycle in cycles.cycles:
print(f" {cycle}")
# Subgraph filtering
subgraph_filter = SubgraphFilter(
include_node_ids=["a", "b", "c"],
min_edge_weight=0.5,
max_hop_distance=2,
from_node="a",
)
subgraph = algo.filter_subgraph(subgraph_filter)
print(f"Nodes in subgraph: {len(subgraph.node_ids)}")
# Reachability analysis
reachable = algo.get_reachable_nodes("start", max_distance=3)
print(f"Reachable nodes: {reachable}")
# Topological order
if algo.is_dag():
topo_order = algo.topological_sort()
print(f"Topological order: {topo_order}")
```
---
### Metrics Tracker
Collects and aggregates performance metrics for nodes and edges.
```python
from core.metrics import (
MetricsTracker,
NodeMetrics,
EdgeMetrics,
MetricAggregator,
ExponentialMovingAverage,
SlidingWindowAverage,
)
tracker = MetricsTracker()
# Record node metrics
tracker.record_node_execution(
node_id="solver",
success=True,
latency_ms=150,
cost_tokens=200,
quality=0.95,
)
# Record edge metrics
tracker.record_edge_traversal(
source="solver",
target="checker",
weight=0.9,
success=True,
latency_ms=50,
)
# Get node metrics
metrics: NodeMetrics = tracker.get_node_metrics("solver")
print(f"Reliability: {metrics.reliability:.3f}")
print(f"Avg latency: {metrics.avg_latency_ms:.1f}ms")
print(f"Total cost: {metrics.total_cost_tokens}")
print(f"Avg quality: {metrics.avg_quality:.3f}")
print(f"Executions: {metrics.execution_count}")
# Get edge metrics
edge_metrics: EdgeMetrics = tracker.get_edge_metrics("solver", "checker")
print(f"Edge reliability: {edge_metrics.reliability:.3f}")
print(f"Traversals: {edge_metrics.traversal_count}")
# Snapshot of all metrics
snapshot = tracker.snapshot()
print(f"Timestamp: {snapshot.timestamp}")
print(f"Node metrics: {snapshot.node_metrics}")
print(f"Edge metrics: {snapshot.edge_metrics}")
# Metrics history (if enabled)
tracker = MetricsTracker(keep_history=True, history_window=100)
# ... records ...
history = tracker.get_history(node_id="solver")
for snapshot in history.snapshots:
print(f"{snapshot.timestamp}: {snapshot.metrics}")
# Custom aggregators
ema = ExponentialMovingAverage(alpha=0.1)
tracker.set_aggregator("solver", "latency", ema)
swa = SlidingWindowAverage(window_size=10)
tracker.set_aggregator("checker", "quality", swa)
# Export metrics
data = tracker.to_dict()
tracker.save("metrics.json")
# Load metrics
tracker = MetricsTracker.load("metrics.json")
```
---
### Visualization
Tools for visualizing graphs in different formats. All visualization styles are based on **Pydantic models** for validation and type safety.
#### Core classes
```python
from core.visualization import (
GraphVisualizer,
VisualizationStyle,
MermaidDirection,
NodeShape,
NodeStyle,
EdgeStyle,
# Convenience functions
to_mermaid,
to_ascii,
to_dot,
print_graph,
render_to_image,
show_graph_interactive,
)
```
#### 1. Quick usage (convenience functions)
```python
# Simple Mermaid
mermaid_code = to_mermaid(graph, direction=MermaidDirection.LEFT_RIGHT)
print(mermaid_code)
# Simple ASCII
ascii_art = to_ascii(graph, show_edges=True)
print(ascii_art)
# Simple DOT
dot_code = to_dot(graph, graph_name="MyGraph")
print(dot_code)
# Print to console (auto-selects Rich or ASCII)
print_graph(graph, format="auto") # "auto", "colored", "ascii", "mermaid"
# Render to image (requires system Graphviz)
render_to_image(graph, "output.png", format="png", dpi=300)
render_to_image(graph, "output.svg", format="svg")
# Interactive view (opens in system viewer)
show_graph_interactive(graph, graph_name="MyWorkflow")
```
#### 2. Advanced usage (GraphVisualizer with custom styles)
**VisualizationStyle**, **NodeStyle**, **EdgeStyle** are Pydantic models with field validation.
```python
# Create custom node styles (Pydantic models)
agent_style = NodeStyle(
shape=NodeShape.ROUND, # RECTANGLE, ROUND, STADIUM, CIRCLE, DIAMOND, etc.
fill_color="#e3f2fd", # Fill color
stroke_color="#1976d2", # Border color
text_color="#000000", # Text color
icon="π€", # Emoji icon
)
task_style = NodeStyle(
shape=NodeShape.DIAMOND,
fill_color="#fff3e0",
stroke_color="#f57c00",
icon="π",
)
# Edge styles (Pydantic models)
workflow_edge = EdgeStyle(
line_style="solid", # solid, dashed, dotted
arrow_head="normal", # normal, none, diamond
color="#1976d2",
label_color="#333333",
)
task_edge = EdgeStyle(
line_style="dashed",
color="#f57c00",
)
# Global visualization style (Pydantic model)
style = VisualizationStyle(
direction=MermaidDirection.LEFT_RIGHT, # TOP_BOTTOM, BOTTOM_TOP, LEFT_RIGHT, RIGHT_LEFT
agent_style=agent_style,
task_style=task_style,
workflow_edge_style=workflow_edge,
task_edge_style=task_edge,
show_weights=True, # Show edge weights
show_probabilities=False, # Show probabilities
show_tools=True, # Show agent tools
show_descriptions=False, # Show descriptions
max_label_length=30, # Max label length
)
# Create a visualizer with custom style
viz = GraphVisualizer(graph, style)
# Mermaid diagrams
mermaid = viz.to_mermaid(
direction=MermaidDirection.TOP_BOTTOM, # Can override style
title="Agent Workflow", # Diagram title
)
print(mermaid)
# Save Mermaid to a file
viz.save_mermaid("graph.md", title="My Workflow") # Wraps in ```mermaid```
viz.save_mermaid("graph.mmd", title="My Workflow") # Raw .mmd without wrapper
# ASCII art for terminal
ascii_art = viz.to_ascii(
show_edges=True,
box_width=20,
)
print(ascii_art)
# Graphviz DOT
dot = viz.to_dot(
graph_name="AgentGraph",
rankdir="LR", # TB, LR, BT, RL
)
viz.save_dot("graph.dot", graph_name="AgentGraph")
# Render to image (requires installed Graphviz)
viz.render_image(
"output.png",
format="png", # png, svg, pdf, jpg
dpi=300, # For raster formats
graph_name="MyGraph",
)
# Interactive view
viz.show_interactive(graph_name="MyGraph") # Opens system viewer
# Adjacency matrix (text representation)
matrix = viz.to_adjacency_matrix(show_labels=True)
print(matrix)
```
#### 3. Colored terminal output (Rich Console)
```python
# Automatic colored output (if Rich is installed)
print_graph(graph, format="colored")
# Or via visualizer
viz = GraphVisualizer(graph)
viz.print_colored() # Pretty output with trees, tables, and colors
```
#### 4. Full configuration example
```python
from core.visualization import (
GraphVisualizer,
VisualizationStyle,
NodeStyle,
EdgeStyle,
NodeShape,
MermaidDirection,
)
# Fully configured style
custom_style = VisualizationStyle(
direction=MermaidDirection.LEFT_RIGHT,
agent_style=NodeStyle(
shape=NodeShape.ROUND,
fill_color="#bbdefb",
stroke_color="#0d47a1",
icon="π€",
),
task_style=NodeStyle(
shape=NodeShape.DIAMOND,
fill_color="#ffe0b2",
stroke_color="#e65100",
icon="π",
),
workflow_edge_style=EdgeStyle(
line_style="solid",
color="#1976d2",
),
task_edge_style=EdgeStyle(
line_style="dashed",
color="#f57c00",
),
show_weights=True,
show_tools=True,
max_label_length=40,
)
viz = GraphVisualizer(graph, custom_style)
# Generate all formats
viz.save_mermaid("docs/graph.md", title="Workflow")
viz.save_dot("docs/graph.dot")
viz.render_image("docs/graph.png", format="png", dpi=150)
viz.render_image("docs/graph.svg", format="svg")
print(viz.to_ascii())
```
#### 5. Installing Graphviz for image rendering
For `render_image()` and `render_to_image()` you need:
1. Python library: `pip install graphviz`
2. System Graphviz:
- Ubuntu/Debian: `sudo apt install graphviz`
- macOS: `brew install graphviz`
- Windows: `winget install graphviz` or https://graphviz.org/download/
---
### Schema System
A complete system of **Pydantic schemas** for type-safe validation, serialization, and migration of graph data. All schemas inherit from `pydantic.BaseModel` and provide automatic type validation, default values, and data conversion.
#### Core schema classes
```python
from core.schema import (
# Versioning
SCHEMA_VERSION,
SchemaVersion,
# Node and edge types
NodeType,
EdgeType,
# Node schemas (Pydantic BaseModel)
BaseNodeSchema,
AgentNodeSchema,
TaskNodeSchema,
# Edge schemas (Pydantic BaseModel)
BaseEdgeSchema,
WorkflowEdgeSchema,
CostMetrics,
# Graph schema (Pydantic BaseModel)
GraphSchema,
# LLM configuration (Pydantic BaseModel)
LLMConfig,
# Validation (Pydantic BaseModel)
ValidationResult,
SchemaValidator,
# Migrations
SchemaMigration,
MigrationRegistry,
migrate_schema,
)
```
#### 1. Creating node schemas (Pydantic models)
```python
# Agent with a full LLM configuration
agent_node = AgentNodeSchema(
id="solver",
type=NodeType.AGENT,
display_name="Math Solver",
persona="You are an expert mathematician",
description="Solves complex math problems step by step",
tools=["calculator", "wolfram_alpha"],
# LLM configuration (Pydantic model)
llm_backbone="gpt-4",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
temperature=0.0,
max_tokens=2000,
# Metrics and state
trust_score=0.95,
quality_score=0.9,
success_rate=1.0,
total_calls=0,
total_tokens_used=0,
# Pydantic validates embedding automatically
embedding=[0.1, 0.2, 0.3], # Can be a list or torch.Tensor
embedding_dim=3, # Auto-filled if None
# Metadata (arbitrary data)
metadata={"priority": "high", "category": "math"},
tags={"solver", "math", "primary"},
)
# Task
task_node = TaskNodeSchema(
id="main_task",
type=NodeType.TASK,
query="Solve: x^2 + 5x + 6 = 0",
description="Main mathematical task",
expected_output="Two solutions: x1, x2",
max_iterations=10,
status="pending", # pending, running, completed, failed
)
# Extract LLM configuration from the agent
llm_config: LLMConfig = agent_node.get_llm_config()
print(f"Model: {llm_config.model_name}")
print(f"Configured: {llm_config.is_configured()}")
print(f"Generation params: {llm_config.to_generation_params()}")
# Check whether an LLM configuration exists
if agent_node.has_llm_config():
print("Agent has LLM configuration")
```
#### 2. Creating edge schemas (Pydantic models)
```python
# Base edge with cost metrics (Pydantic model)
edge = BaseEdgeSchema(
source="solver",
target="checker",
type=EdgeType.WORKFLOW,
weight=1.0,
probability=0.95,
bidirectional=False,
# Cost metrics (Pydantic model)
cost=CostMetrics(
estimated_tokens=500,
actual_tokens=None,
latency_ms=150.0,
timeout_ms=5000.0,
trust=0.9,
reliability=0.95,
cost_usd=0.01,
custom={"priority": 1.0},
),
# Pydantic validates attr automatically
attr=[1.0, 0.95, 0.9], # Can be a list or torch.Tensor
attr_dim=3, # Auto-filled if None
metadata={"route": "primary"},
)
# Workflow edge with conditional routing
conditional_edge = WorkflowEdgeSchema(
source="solver",
target="checker",
type=EdgeType.WORKFLOW,
weight=0.9,
probability=1.0,
# Conditional routing
condition="source_success", # Name of a built-in or registered condition
priority=1, # Priority (higher = checked earlier)
transform="extract_answer", # Optional data transform
is_conditional=True, # Auto-set if condition is provided
)
# Get edge features
feature_vector = edge.get_feature_vector(feature_names=["trust", "reliability"])
print(f"Features: {feature_vector}")
# Convert to torch.Tensor
attr_tensor = edge.to_attr_tensor()
print(f"Attr tensor: {attr_tensor}")
```
#### 3. Full graph schema (Pydantic model)
```python
from datetime import datetime
# GraphSchema - the main Pydantic model
schema = GraphSchema(
schema_version=SCHEMA_VERSION, # "2.0.0"
name="Math Pipeline",
description="A workflow for solving mathematical problems",
created_at=datetime.now(),
updated_at=datetime.now(),
# nodes is dict[str, BaseNodeSchema], not a list!
nodes={
"solver": AgentNodeSchema(
id="solver",
display_name="Math Solver",
description="Solves math problems",
tools=["calculator"],
llm_backbone="gpt-4",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
),
"checker": AgentNodeSchema(
id="checker",
display_name="Answer Checker",
description="Validates solutions",
llm_backbone="gpt-4o-mini",
),
"__task__": TaskNodeSchema(
id="__task__",
query="Solve: x^2 + 5x + 6 = 0",
),
},
edges=[
WorkflowEdgeSchema(
source="solver",
target="checker",
weight=0.9,
type=EdgeType.WORKFLOW,
),
],
# Feature names for feature extraction
node_feature_names=["trust_score", "quality_score"],
edge_feature_names=["trust", "reliability"],
# Metadata
metadata={
"created_by": "user@example.com",
"purpose": "math_pipeline",
"version": "1.0",
},
)
# Add nodes and edges
new_agent = AgentNodeSchema(
id="reviewer",
display_name="Reviewer",
)
schema.add_node(new_agent)
new_edge = BaseEdgeSchema(
source="checker",
target="reviewer",
)
schema.add_edge(new_edge)
# Retrieve nodes and edges
solver_node = schema.get_node("solver")
edges_from_solver = schema.get_edges(source="solver")
edges_to_checker = schema.get_edges(target="checker")
# Compute feature dimensionalities
schema.compute_feature_dims()
print(f"Node feature dim: {schema.node_feature_dim}")
print(f"Edge feature dim: {schema.edge_feature_dim}")
```
#### 4. Serialization and validation (Pydantic)
```python
# Serialization (Pydantic methods)
schema_dict = schema.model_dump() # Dict[str, Any]
schema_json = schema.model_dump_json(indent=2) # JSON string
# Or a specialized method
schema_data = schema.to_dict()
# Deserialization (Pydantic methods)
loaded_schema = GraphSchema.model_validate(schema_dict)
loaded_from_json = GraphSchema.model_validate_json(schema_json)
# Schema validation (returns ValidationResult - Pydantic model)
validator = SchemaValidator(
check_cycles=True,
check_duplicates=True,
check_orphans=True,
check_connectivity=False,
)
result: ValidationResult = validator.validate(schema)
if result.valid:
print("β Schema is valid")
else:
print("β Validation errors:")
for error in result.errors:
print(f" - {error}")
if result.warnings:
print("β Warnings:")
for warning in result.warnings:
print(f" - {warning}")
```
#### 5. Schema migration between versions
```python
# Automatic migration of legacy data
old_data = {
"schema_version": "1.0.0",
"agents": [ # Old format (agents list)
{"agent_id": "solver", "display_name": "Solver"},
],
"edges": [
{"source": "solver", "target": "checker"},
],
}
# Migrate to the current version (2.0.0)
migrated_data = migrate_schema(old_data)
print(f"Migrated to version: {migrated_data['schema_version']}")
# Create a custom migration
from core.schema import SchemaMigration, register_migration
class MyCustomMigration(SchemaMigration):
from_version = "1.5.0"
to_version = "2.0.0"
def migrate(self, data: dict) -> dict:
# Your migration logic
data["new_field"] = "default_value"
return data
# Register migration
register_migration(MyCustomMigration())
```
#### 6. Versioning
```python
# Check schema version
current_version = SchemaVersion.parse(SCHEMA_VERSION) # "2.0.0"
print(f"Current: {current_version}")
old_version = SchemaVersion.parse("1.5.0")
print(f"Compatible: {current_version.is_schema_compatible(old_version)}") # False (different major versions)
print(f"Newer: {current_version > old_version}") # True
```
#### Benefits of Pydantic schemas
1. **Automatic type validation** β Pydantic checks types when creating objects
2. **Default values** β fields are auto-populated
3. **Type conversion** β automatic conversion (torch.Tensor β list)
4. **Serialization/deserialization** β built-in `.model_dump()`, `.model_validate()`
5. **Extensibility** β `extra="allow"` enables arbitrary fields
6. **Immutability** β `frozen=True` for immutable models
7. **Documentation** β automatic JSON Schema generation
---
#### 7. Agent input/output validation
**New:** Each agent can have **input_schema** and **output_schema** to validate incoming data and outputs. This allows you to:
- π Guarantee data correctness
- π Automatically parse structured outputs
- π« Catch invalid LLM outputs
- π Generate JSON Schema for prompts
> **Prompt injection:** `_build_prompt` automatically injects schemas into the LLM prompt.
> - `output_schema` β system message: `"Respond with JSON matching: {schema}"`
> - `input_schema` β user message: `"Input format: {schema}"`
>
> The schemas are serialised as compact JSON (no extra whitespace) to minimise token usage.
> No manual prompt engineering is required.
##### Imports
```python
from pydantic import BaseModel
from core.schema import (
AgentNodeSchema,
SchemaValidationResult, # Validation result
)
from builder import GraphBuilder
```
##### 7.1. Create an agent with Pydantic schemas
```python
# Define input/output schemas as Pydantic models
class SolverInput(BaseModel):
question: str
context: str | None = None
difficulty: int = 1
class SolverOutput(BaseModel):
answer: str
confidence: float # 0.0 - 1.0
explanation: str | None = None
# Create an agent with validation
builder = GraphBuilder()
builder.add_agent(
"solver",
display_name="Math Solver",
persona="Expert mathematician",
description="Solves mathematical problems",
# Schemas for validation
input_schema=SolverInput,
output_schema=SolverOutput,
# LLM configuration
llm_backbone="gpt-4",
temperature=0.0,
)
graph = builder.build()
```
##### 7.2. Using JSON Schema (without Pydantic)
You can pass a plain dict with JSON Schema:
```python
# JSON Schema directly (without Pydantic models)
input_schema = {
"type": "object",
"properties": {
"question": {"type": "string"},
"context": {"type": "string"},
},
"required": ["question"]
}
output_schema = {
"type": "object",
"properties": {
"answer": {"type": "string"},
"confidence": {"type": "number"},
},
"required": ["answer", "confidence"]
}
builder.add_agent(
"solver",
input_schema=input_schema, # JSON Schema dict
output_schema=output_schema, # JSON Schema dict
)
```
##### 7.3. Validation via RoleGraph
```python
# Check whether schemas exist
has_input = graph.has_input_schema("solver") # True
has_output = graph.has_output_schema("solver") # True
# Validate input data
result: SchemaValidationResult = graph.validate_agent_input(
"solver",
{"question": "Solve x^2 + 5x + 6 = 0"}
)
if result.valid:
print("β
Input is valid")
print(f"Validated data: {result.validated_data}")
else:
print("β Input validation failed")
print(f"Errors: {result.errors}")
# Validate output data (JSON string or dict)
response = '{"answer": "x1=-2, x2=-3", "confidence": 0.95}'
result = graph.validate_agent_output("solver", response)
if result.valid:
parsed = result.validated_data
print(f"Answer: {parsed['answer']}")
print(f"Confidence: {parsed['confidence']}")
else:
print(f"Invalid output: {result.errors}")
# You can raise an exception
result.raise_if_invalid() # -> ValueError
```
##### 7.4. Getting JSON Schema for prompts
```python
# Get JSON Schema for LLM instructions
input_schema_json = graph.get_input_schema_json("solver")
output_schema_json = graph.get_output_schema_json("solver")
# Use in the prompt
prompt = f"""You are a math solver.
INPUT FORMAT:
{json.dumps(input_schema_json, indent=2)}
You MUST respond in the following JSON format:
{json.dumps(output_schema_json, indent=2)}
Now solve: {{question}}
"""
```
##### 7.5. Validation directly via AgentNodeSchema
```python
# Create an agent with schemas
agent = AgentNodeSchema(
id="solver",
display_name="Math Solver",
input_schema=SolverInput,
output_schema=SolverOutput,
)
# Validate
result = agent.validate_input({"question": "2+2=?"})
print(f"Valid: {result.valid}")
result = agent.validate_output('{"answer": "4", "confidence": 0.99}')
print(f"Valid: {result.valid}, data: {result.validated_data}")
# Check schema presence
if agent.has_input_schema():
print("Agent has input schema")
if agent.has_output_schema():
print("Agent has output schema")
```
##### 7.6. Handling invalid LLM outputs
```python
# Scenario: the LLM responds in the wrong format
response = llm_call(prompt)
result = graph.validate_agent_output("solver", response)
if not result.valid:
# Option 1: Retry with a stricter prompt
retry_prompt = f"{prompt}\n\nβ οΈ IMPORTANT: You MUST respond with valid JSON!"
response = llm_call(retry_prompt)
result = graph.validate_agent_output("solver", response)
if not result.valid:
# Option 2: Fallback to default values
parsed = {
"answer": response,
"confidence": 0.5,
"explanation": "LLM failed to format correctly"
}
else:
parsed = result.validated_data
else:
parsed = result.validated_data
print(f"Final answer: {parsed['answer']}")
```
##### 7.7. SchemaValidationResult API
```python
class SchemaValidationResult(BaseModel):
"""Schema validation result."""
valid: bool # True if data is valid
schema_type: str # "input" or "output"
errors: list[str] # Validation errors
warnings: list[str] # Validation warnings
validated_data: dict[str, Any] | None # Validated data
message: str # Additional message
# Methods
result.raise_if_invalid() # Raise ValueError if invalid
```
##### 7.8. Serialization support
When saving a graph:
- **Pydantic models** (`input_schema`/`output_schema`) are **NOT** serialized (exclude=True)
- **JSON Schema** (`input_schema_json`/`output_schema_json`) **is** serialized
```python
# When creating an agent with a Pydantic model
agent = AgentNodeSchema(
id="solver",
input_schema=SolverInput, # Not serialized
output_schema=SolverOutput, # Not serialized
)
# JSON Schema is extracted automatically
print(agent.input_schema_json) # {'type': 'object', 'properties': {...}}
print(agent.output_schema_json) # {'type': 'object', 'properties': {...}}
# When deserializing a graph from JSON
# Pydantic models are lost, but JSON Schema remains
# Validation works via basic type checks
```
##### When should you use input/output schemas?
| Scenario | Recommendation |
|----------|----------------|
| **Structured data** | β
Use Pydantic schemas |
| **JSON outputs from an LLM** | β
Required! Parsing and validation |
| **Free-form text** | β Not needed |
| **API integration** | β
Guarantees correct data |
| **Debugging** | β
Quickly surfaces issues |
##### Performance impact
- β
**Validation does not consume tokens** β it is pure Python
- β οΈ **Prompt instructions consume tokens** β embedding JSON Schema into prompts increases token usage
- β‘ **Validation is fast** β Pydantic is optimized for speed
##### Validation FAQ
**Q: Is this required?**
A: No, it is fully optional. If schemas are not set, validation is skipped.
**Q: What if the LLM cannot respond in the required format?**
A: `validate_output()` returns `valid=False` plus errors. Options: retry/fallback/ignore.
**Q: Can I pass plain JSON Schema?**
A: Yes. Pass a dict with JSON Schema instead of a Pydantic model.
**Q: Does token usage increase?**
A: Validation does not consume tokens. But including JSON Schema in prompts does increase token usage.
---
### Builder API (Detailed)
Different ways to construct graphs.
#### 1. build_property_graph (quick construction)
```python
from builder import build_property_graph
graph = build_property_graph(
agents=[agent1, agent2, agent3],
workflow_edges=[("agent1", "agent2"), ("agent2", "agent3")],
context_edges=[("agent1", "agent3")], # Additional connections
query="Solve this task",
include_task_node=True, # Add a task node
task_node_id="__task__", # Task node ID
connect_task_to_all=False, # Connect task to all agents
edge_weights=None, # Custom edge weights
default_weight=1.0, # Default weight
bidirectional=False, # Bidirectional edges
encoder=None, # NodeEncoder for embeddings
compute_embeddings=False, # Compute embeddings immediately
)
```
#### 2. GraphBuilder (fluent API)
```python
from builder import GraphBuilder
builder = GraphBuilder()
# Add agents (basic)
builder.add_agent(
agent_id="researcher",
display_name="Researcher",
description="Does research",
tools=["search", "read"],
)
# Add an agent with multi-model configuration
builder.add_agent(
agent_id="analyst",
display_name="Senior Analyst",
persona="Expert data analyst",
# LLM configuration
llm_backbone="gpt-4", # Model name
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY", # Or $ENV_VAR
temperature=0.7,
max_tokens=2000,
timeout=60.0,
top_p=0.9,
stop_sequences=["END", "STOP"],
)
# Or via an LLMConfig object
from core.schema import LLMConfig
llm_config = LLMConfig(
model_name="gpt-4",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
temperature=0.7,
max_tokens=2000,
)
builder.add_agent(
agent_id="writer",
display_name="Writer",
llm_config=llm_config, # Pass a ready configuration
)
# Add edges
builder.add_workflow_edge("researcher", "writer", weight=0.9)
builder.add_context_edge("researcher", "writer", weight=0.5)
# Add a task
builder.set_task(query="Write a report", description="Main task")
# Conditional edges
def quality_check(state: dict) -> bool:
return state.get("quality_score", 0) > 0.8
builder.add_conditional_edge(
source="writer",
target="editor",
condition=quality_check,
weight=0.9,
)
# Set execution bounds (new!)
builder.set_start_node("researcher") # Start node
builder.set_end_node("writer") # End node
# Or both at once:
builder.set_execution_bounds("researcher", "writer")
# Build the graph
graph = builder.build(compute_embeddings=True, encoder=my_encoder)
# Validate before building
is_valid, errors = builder.validate()
if not is_valid:
print(f"Errors: {errors}")
```
#### 3. build_from_adjacency (from a matrix)
```python
from builder import build_from_adjacency
import torch
adjacency = torch.tensor([
[0, 1, 0],
[0, 0, 1],
[0, 0, 0],
], dtype=torch.float32)
graph = build_from_adjacency(
adjacency_matrix=adjacency,
agents=[agent1, agent2, agent3],
query="Task",
threshold=0.1, # Ignore edges with weight < threshold
)
```
#### 4. build_from_schema (from a schema)
```python
from builder import build_from_schema
graph = build_from_schema(
schema=my_schema,
compute_embeddings=True,
encoder=my_encoder,
validate=True, # Validate before building
)
```
---
### Event System
Subscribe to events for monitoring and debugging.
```python
from core.events import (
EventBus,
global_event_bus,
EventType,
LoggingEventHandler,
MetricsEventHandler,
on_event,
# Events
NodeAddedEvent,
EdgeAddedEvent,
StepCompletedEvent,
BudgetWarningEvent,
)
# Get the global event bus
bus = global_event_bus()
# 1. Subscribe via a handler
logging_handler = LoggingEventHandler(
log_level="INFO",
include_metadata=True,
)
bus.subscribe(EventType.STEP_COMPLETED, logging_handler)
# 2. Subscribe via a function
def on_step_completed(event):
if isinstance(event, StepCompletedEvent):
print(f"Agent {event.agent_id} completed: {event.tokens_used} tokens")
bus.subscribe(EventType.STEP_COMPLETED, on_step_completed)
# 3. Subscribe via a decorator
@on_event(EventType.BUDGET_WARNING)
def handle_budget_warning(event: BudgetWarningEvent):
print(f"β οΈ Budget warning: {event.budget_type} at {event.ratio:.1%}")
# 4. Global subscription (all events)
@on_event(None)
def handle_all_events(event):
print(f"Event: {event.event_type.value}")
# Disable event handling
bus.disable()
# Enable
bus.enable()
# Clear all handlers
bus.clear()
# Aggregate metrics via events
metrics_handler = MetricsEventHandler()
bus.subscribe(None, metrics_handler)
# After execution
metrics = metrics_handler.get_metrics()
print(f"Total tokens: {metrics['total_tokens']}")
print(f"Errors: {metrics['errors_count']}")
print(f"Budget warnings: {metrics['budget_warnings']}")
```
---
### Callback system
Monitoring and logging execution via callback handlers.
#### Core concepts
- **`BaseCallbackHandler`** β base class for creating callback handlers
- **`AsyncCallbackHandler`** β async version for asynchronous operations
- **`CallbackManager`** β manager that orchestrates and invokes handlers
- **Built-in handlers** β StdoutCallbackHandler, MetricsCallbackHandler, FileCallbackHandler
#### Quick start
```python
from execution import MACPRunner
from callbacks import (
StdoutCallbackHandler,
MetricsCallbackHandler,
FileCallbackHandler,
)
# 1. Callbacks via RunnerConfig
from execution import RunnerConfig
config = RunnerConfig(
callbacks=[
StdoutCallbackHandler(show_outputs=True),
MetricsCallbackHandler(),
]
)
runner = MACPRunner(llm_caller=my_llm, config=config)
result = runner.run_round(graph)
# 2. Per-run callbacks (override config)
result = runner.run_round(
graph,
callbacks=[FileCallbackHandler("execution_log.jsonl")]
)
```
#### Context Manager
```python
from callbacks import collect_metrics, trace_as_callback
# 1. Collect metrics
with collect_metrics() as metrics:
runner.run_round(graph)
print(f"Total tokens: {metrics.total_tokens}")
print(f"Total duration: {metrics.total_duration_ms}ms")
print(f"Runs completed: {metrics.runs_completed}")
print(f"Runs failed: {metrics.runs_failed}")
# Full statistics
all_metrics = metrics.get_metrics()
print(f"Agent calls: {all_metrics['agent_calls']}")
print(f"Errors: {all_metrics['errors_count']}")
# 2. Tracing with arbitrary handlers
from callbacks import StdoutCallbackHandler
with trace_as_callback(handlers=[StdoutCallbackHandler()]) as manager:
runner.run_round(graph)
# Callbacks are automatically applied to this run
```
#### Creating your own CallbackHandler
```python
from callbacks import BaseCallbackHandler
from uuid import UUID
class MySlackAlertHandler(BaseCallbackHandler):
"""Sends Slack alerts on errors."""
def on_run_start(
self,
*,
run_id: UUID,
query: str,
num_agents: int = 0,
**kwargs,
) -> None:
send_slack(f"π Started run {run_id}: {num_agents} agents")
def on_agent_end(
self,
*,
run_id: UUID,
agent_id: str,
output: str,
tokens_used: int = 0,
duration_ms: float = 0.0,
**kwargs,
) -> None:
print(f"β
Agent {agent_id}: {tokens_used} tokens, {duration_ms:.0f}ms")
def on_agent_error(
self,
error: BaseException,
*,
run_id: UUID,
agent_id: str,
**kwargs,
) -> None:
send_slack_alert(
f"β Agent {agent_id} failed in run {run_id}: {error}",
severity="high"
)
def on_run_end(
self,
*,
run_id: UUID,
output: str,
success: bool = True,
total_tokens: int = 0,
**kwargs,
) -> None:
if not success:
send_slack_alert(f"π Run {run_id} failed!")
else:
send_slack(f"β
Run {run_id} completed: {total_tokens} tokens")
# Usage
runner = MACPRunner(
llm_caller=my_llm,
config=RunnerConfig(callbacks=[MySlackAlertHandler()])
)
```
#### Async Callbacks
```python
from callbacks import AsyncCallbackHandler
import aiohttp
class AsyncWebhookHandler(AsyncCallbackHandler):
"""Asynchronously sends a webhook on events."""
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
async def on_run_start(
self,
*,
run_id: UUID,
query: str,
**kwargs,
) -> None:
async with aiohttp.ClientSession() as session:
await session.post(
self.webhook_url,
json={"event": "run_start", "run_id": str(run_id), "query": query}
)
async def on_agent_end(
self,
*,
run_id: UUID,
agent_id: str,
output: str,
tokens_used: int = 0,
**kwargs,
) -> None:
async with aiohttp.ClientSession() as session:
await session.post(
self.webhook_url,
json={
"event": "agent_end",
"run_id": str(run_id),
"agent_id": agent_id,
"tokens": tokens_used,
}
)
# Usage with async runner
runner = MACPRunner(
async_llm_caller=my_async_llm,
config=RunnerConfig(callbacks=[AsyncWebhookHandler("https://api.example.com/webhook")])
)
result = await runner.arun_round(graph)
```
#### Built-in handlers
##### 1. StdoutCallbackHandler β console output
```python
from callbacks import StdoutCallbackHandler
handler = StdoutCallbackHandler(
color=True, # Colored output
show_prompts=False, # Show prompts
show_outputs=True, # Show agent outputs
truncate_length=200, # Output truncation length
)
runner = MACPRunner(
llm_caller=my_llm,
config=RunnerConfig(callbacks=[handler])
)
# Output example:
# π Run started: 5 agents
# Order: researcher β analyst β writer β editor β publisher
# βΆοΈ [0] Researcher started
# π οΈ Tool 'web_search.search' started with args: {query: "market analysis"}
# β
Success Tool 'web_search.search' ended (1200ms, 3500 chars)
# β
[0] Researcher completed: 150 tokens, 1200ms
# Output: Market analysis shows strong growth...
# βΆοΈ [1] Analyst started
# β
[1] Analyst completed: 200 tokens, 1500ms [FINAL]
# β
Run completed: 350 tokens, 2700ms
```
##### 2. MetricsCallbackHandler β metrics aggregation
```python
from callbacks import MetricsCallbackHandler
metrics_handler = MetricsCallbackHandler()
runner = MACPRunner(
llm_caller=my_llm,
config=RunnerConfig(callbacks=[metrics_handler])
)
result = runner.run_round(graph)
# Retrieve metrics
metrics = metrics_handler.get_metrics()
print(f"Total tokens: {metrics['total_tokens']}")
print(f"Total duration: {metrics['total_duration_ms']}ms")
print(f"Agent calls: {metrics['agent_calls']}") # {'researcher': 1, 'writer': 1, ...}
print(f"Agent tokens: {metrics['agent_tokens']}") # {'researcher': 150, ...}
print(f"Errors: {metrics['errors_count']}")
print(f"Retries: {metrics['retries']}")
print(f"Budget warnings: {metrics['budget_warnings']}")
print(f"Runs completed: {metrics['runs_completed']}")
# Averages
print(f"Avg tokens per agent: {metrics['avg_tokens_per_agent']}")
# Tool metrics (WebSearchTool and other tools)
print(f"Tool calls: {metrics['tool_calls']}") # {'web_search.search': 3, 'web_search.fetch': 1}
print(f"Tool durations: {metrics['tool_durations']}") # {'web_search.search': 3600.0, ...}
print(f"Tool errors: {metrics['tool_errors_count']}") # 0
# Last 10 errors
for error in metrics['errors']:
print(f"Error in {error['agent_id']}: {error['error_message']}")
# Last 10 tool errors
for error in metrics['tool_errors']:
print(f"Tool error: {error['tool_name']}.{error['action']}: {error['error_message']}")
# Reset metrics
metrics_handler.reset()
```
##### 3. FileCallbackHandler β write to a JSON Lines file
```python
from callbacks import FileCallbackHandler
handler = FileCallbackHandler(
file_path="execution_log.jsonl",
append=True, # Append or overwrite
flush_every=1, # Flush after each event
)
runner = MACPRunner(
llm_caller=my_llm,
config=RunnerConfig(callbacks=[handler])
)
result = runner.run_round(graph)
# Close the file manually (or it is closed automatically via __del__)
handler.close()
# File format (JSON Lines):
# {"event_type": "run_start", "timestamp": "2024-...", "run_id": "...", "query": "...", "num_agents": 5}
# {"event_type": "agent_start", "timestamp": "...", "run_id": "...", "agent_id": "researcher", ...}
# {"event_type": "agent_end", "timestamp": "...", "run_id": "...", "agent_id": "researcher", "tokens_used": 150, ...}
```
#### Available callback methods
| Method | Description | Parameters |
|-------|-------------|-----------|
| `on_run_start` | Run start | `run_id`, `query`, `num_agents`, `execution_order` |
| `on_run_end` | Run end | `run_id`, `output`, `success`, `error`, `total_tokens`, `total_time_ms`, `executed_agents` |
| `on_agent_start` | Agent started | `run_id`, `agent_id`, `agent_name`, `step_index`, `prompt`, `predecessors` |
| `on_agent_end` | Agent finished | `run_id`, `agent_id`, `output`, `tokens_used`, `duration_ms`, `is_final` |
| `on_agent_error` | Agent error | `error`, `run_id`, `agent_id`, `error_type`, `will_retry`, `attempt` |
| `on_retry` | Retry attempt | `run_id`, `agent_id`, `attempt`, `max_attempts`, `delay_ms`, `error` |
| `on_llm_new_token` | New token (streaming) | `token`, `run_id`, `agent_id`, `token_index`, `is_first`, `is_last` |
| `on_plan_created` | Plan created | `run_id`, `num_steps`, `execution_order` |
| `on_topology_changed` | Topology changed | `run_id`, `reason`, `old_remaining`, `new_remaining`, `change_count` |
| `on_prune` | Agent pruned | `run_id`, `agent_id`, `reason` |
| `on_fallback` | Fallback activated | `run_id`, `failed_agent_id`, `fallback_agent_id`, `reason` |
| `on_parallel_start` | Parallel group start | `run_id`, `agent_ids`, `group_index` |
| `on_parallel_end` | Parallel group end | `run_id`, `agent_ids`, `successful`, `failed` |
| `on_memory_read` | Memory read | `run_id`, `agent_id`, `entries_count`, `keys` |
| `on_memory_write` | Memory write | `run_id`, `agent_id`, `key`, `value_size` |
| `on_budget_warning` | Budget warning | `run_id`, `budget_type`, `current`, `limit`, `ratio` |
| `on_budget_exceeded` | Budget exceeded | `run_id`, `budget_type`, `current`, `limit`, `action_taken` |
| `on_tool_start` | Tool started | `run_id`, `tool_name`, `action`, `arguments` |
| `on_tool_end` | Tool finished | `run_id`, `tool_name`, `action`, `success`, `duration_ms`, `output_size`, `result_summary` |
| `on_tool_error` | Tool error | `run_id`, `tool_name`, `action`, `error_type`, `error_message` |
#### Tool Callback Events
Tools emit events via the callback system. This lets you monitor all tool actions without direct logging.
**Event types:**
| Event | Class | Description |
|------|-------|-------------|
| `TOOL_START` | `ToolStartEvent` | Tool action started |
| `TOOL_END` | `ToolEndEvent` | Tool action successfully completed |
| `TOOL_ERROR` | `ToolErrorEvent` | Tool action failed |
**Example: handling tool events**
```python
from callbacks import BaseCallbackHandler, CallbackManager
from tools import WebSearchTool
from uuid import UUID
class ToolMonitorHandler(BaseCallbackHandler):
"""Monitor all tool actions."""
def on_tool_start(
self,
*,
run_id: UUID,
tool_name: str,
action: str,
arguments: dict,
**kwargs,
) -> None:
print(f"[TOOL] {tool_name}.{action} started with {arguments}")
def on_tool_end(
self,
*,
run_id: UUID,
tool_name: str,
action: str,
success: bool = True,
duration_ms: float = 0.0,
output_size: int = 0,
result_summary: str = "",
**kwargs,
) -> None:
status = "OK" if success else "FAIL"
print(f"[TOOL] {tool_name}.{action} {status} ({duration_ms:.0f}ms, {output_size} chars)")
def on_tool_error(
self,
error: BaseException = None,
*,
run_id: UUID,
tool_name: str,
action: str,
error_type: str = "",
error_message: str = "",
**kwargs,
) -> None:
print(f"[TOOL ERROR] {tool_name}.{action}: {error_type} - {error_message}")
# Usage
cb = CallbackManager(handlers=[ToolMonitorHandler()])
tool = WebSearchTool(callback_manager=cb)
tool.execute(query="Python tutorials")
# [TOOL] web_search.search started with {'query': 'Python tutorials'}
# [TOOL] web_search.search OK (1200ms, 3500 chars)
```
**Built-in handlers already support tool events:**
- `StdoutCallbackHandler` β prints tool events to console with emoji
- `MetricsCallbackHandler` β collects metrics for tool_calls, tool_durations, tool_errors
#### Ignore flags
You can disable specific event types:
```python
class MyMinimalHandler(BaseCallbackHandler):
# Ignore most events
ignore_llm = True # Do not call on_llm_new_token
ignore_retry = True # Do not call on_retry
ignore_budget = True # Do not call on_budget_*
ignore_memory = True # Do not call on_memory_*
ignore_tool = True # Do not call on_tool_start/end/error
# Handle only errors
def on_agent_error(self, error, *, run_id, agent_id, **kwargs):
log_critical_error(agent_id, error)
```
#### Combining handlers
```python
from callbacks import (
StdoutCallbackHandler,
MetricsCallbackHandler,
FileCallbackHandler,
)
# You can use multiple handlers at the same time
runner = MACPRunner(
llm_caller=my_llm,
config=RunnerConfig(callbacks=[
StdoutCallbackHandler(show_outputs=False), # Only status to console
MetricsCallbackHandler(), # Metrics collection
FileCallbackHandler("debug.jsonl"), # Full log to file
MySlackAlertHandler(), # Slack alerts
])
)
```
---
### State Storage
Persistent storage for node states.
```python
from utils.state_storage import (
InMemoryStateStorage,
FileStateStorage,
)
# 1. In-memory storage
storage = InMemoryStateStorage()
storage.save("agent_id", {"messages": [...], "context": {...}})
state = storage.load("agent_id")
storage.delete("agent_id")
all_keys = storage.keys()
storage.clear()
# 2. File-based storage
storage = FileStateStorage(directory="./agent_states")
storage.save("researcher", {
"messages": [{"role": "user", "content": "Hello"}],
"iteration": 5,
})
state = storage.load("researcher")
if state:
print(f"Iteration: {state['iteration']}")
storage.delete("researcher")
# Get all stored IDs
all_agent_ids = storage.keys()
# Clear all states
storage.clear()
```
---
### Async Utils
Helper functions for asynchronous execution.
```python
from utils.async_utils import (
run_sync,
gather_with_concurrency,
timeout_wrapper,
)
# 1. Run a coroutine synchronously
async def my_async_function():
return "result"
result = run_sync(my_async_function(), context="my_context")
# 2. Parallel execution with a concurrency limit
async def fetch_data(agent_id: str):
# ... async call ...
return response
async def main():
tasks = [fetch_data(f"agent_{i}") for i in range(20)]
# Run no more than 5 at once
results = await gather_with_concurrency(5, *tasks)
return results
# 3. Timeouts
async def slow_operation():
await asyncio.sleep(10)
return "done"
async def main():
try:
result = await timeout_wrapper(
slow_operation(),
timeout=5.0,
error_message="Operation took too long",
)
except TimeoutError as e:
print(f"Timeout: {e}")
```
---
### Conditional Routing
Dynamic selection of the next agent based on conditions.
```python
from core.graph import ConditionalEdge
from execution.scheduler import ConditionContext, ConditionEvaluator
# 1. Define conditional edges
def quality_above_threshold(context: ConditionContext) -> bool:
"""Go to editor only if quality > 0.8"""
quality = context.state.get("quality_score", 0)
return quality > 0.8
def has_errors(context: ConditionContext) -> bool:
"""Go to fixer if there are errors"""
return "errors" in context.state and len(context.state["errors"]) > 0
# Add conditional edges to the graph
graph.add_conditional_edge(
source="writer",
targets={
"editor": quality_above_threshold,
"fixer": has_errors,
},
default="reviewer", # Fallback if no condition matches
)
# 2. Use via the builder
from builder import GraphBuilder
builder = GraphBuilder()
builder.add_agent(agent_id="writer", display_name="Writer")
builder.add_agent(agent_id="editor", display_name="Editor")
builder.add_agent(agent_id="fixer", display_name="Fixer")
builder.add_conditional_edge(
source="writer",
target="editor",
condition=quality_above_threshold,
weight=0.9,
)
builder.add_conditional_edge(
source="writer",
target="fixer",
condition=has_errors,
weight=0.7,
)
graph = builder.build()
# 3. Evaluate conditions at runtime
evaluator = ConditionEvaluator()
context = ConditionContext(
current_node="writer",
state={"quality_score": 0.85, "errors": []},
history=["researcher", "writer"],
metadata={"iteration": 1},
)
# Evaluate a single condition
if evaluator.evaluate(quality_above_threshold, context):
next_node = "editor"
# Evaluate all conditions for a node
next_nodes = evaluator.evaluate_all(graph, "writer", context)
print(f"Next nodes: {next_nodes}")
```
---
### Agent Tools (Tools)
The `tools` module allows agents to use external tools via Native Function Calling.
**Key principle:** If an agent has tools specified, they are **ALWAYS** used automatically on every LLM call.
**Built-in tools:**
- `shell` β execute shell commands
- `code_interpreter` β execute Python code in a sandbox
- `file_search` β search files and their contents
- `web_search` β search the web (DuckDuckGo, Serper, Tavily) + Selenium browser for dynamic pages
- `function_calling` β call custom functions
#### Quick start
```python
from builder import GraphBuilder
from execution import MACPRunner
from tools import tool, OpenAIToolsCaller
from openai import OpenAI
# 1. Register tools via the @tool decorator
@tool
def fibonacci(n: int) -> str:
"""Calculate the n-th Fibonacci number."""
a, b = 0, 1
for _ in range(n):
a, b = b, a + b
return str(a)
@tool
def is_prime(n: int) -> str:
"""Check if a number is prime."""
if n < 2:
return "False"
for i in range(2, int(n**0.5) + 1):
if n % i == 0:
return "False"
return "True"
# 2. Create an agent with tools
builder = GraphBuilder()
builder.add_agent(
agent_id="math",
display_name="Math Agent",
persona="a helpful math assistant",
tools=["fibonacci", "is_prime"], # <-- tools are specified here!
)
builder.add_task(query="Calculate fibonacci(20) and check if it's prime")
builder.connect_task_to_agents(agent_ids=["math"])
# 3. Create caller and runner
client = OpenAI(api_key="...")
caller = OpenAIToolsCaller(client, model="gpt-4")
runner = MACPRunner(llm_caller=caller)
# 4. Run β tools are used AUTOMATICALLY
result = runner.run_round(builder.build())
print(result.final_answer)
```
**Important:**
- Tools are set when creating an agent via the `tools` parameter
- Runner automatically passes tools to the LLM via the API
- No `enable_tools` flags are needed β it works automatically
#### Two ways to register tools
**Method 1: Global `@tool` decorator (recommended)**
```python
from tools import tool
@tool
def calculate(expression: str) -> str:
"""Evaluate a math expression."""
return str(eval(expression))
@tool
def search_web(query: str) -> str:
"""Search the web for information."""
return f"Results for: {query}"
```
**Method 2: Via ToolRegistry**
```python
from tools import ToolRegistry, get_registry
# Global registry
registry = get_registry()
@registry.function
def my_tool(arg: str) -> str:
"""Description for the LLM."""
return arg.upper()
# Or create your own registry
my_registry = ToolRegistry()
@my_registry.function
def custom_tool(x: int) -> str:
return str(x * 2)
```
#### Passing tools as objects
You can pass BaseTool objects directly into AgentProfile:
```python
from core.agent import AgentProfile
from tools import CodeInterpreterTool, ShellTool
# Create an agent with tool objects
agent = AgentProfile(
agent_id="coder",
display_name="Code Agent",
persona="a Python programmer",
tools=[CodeInterpreterTool(timeout=10), ShellTool()], # <-- objects!
)
# Add to the graph
builder = GraphBuilder()
builder.add_agent_profile(agent)
```
#### Supported tools
| Tool | Description |
|------|-------------|
| `shell` | Execute shell commands |
| `function_calling` | Call registered Python functions (grouped) |
| `code_interpreter` | Execute Python code in a sandbox |
| `file_search` | Search files and file contents in directories |
#### Base classes
```python
from tools import (
BaseTool, # Abstract base class for tools
ToolCall, # A tool-call request (parsed from LLM output)
ToolResult, # Tool execution result
ToolRegistry, # Tool registry
ShellTool, # Tool for shell commands
FunctionTool, # Tool for calling (grouped) functions
CodeInterpreterTool, # Tool for executing Python code
FileSearchTool, # Tool for searching files
)
```
#### ShellTool β executing shell commands
```python
from tools import ShellTool, ToolRegistry
# Create a ShellTool with safety settings
shell_tool = ShellTool(
timeout=30, # Timeout in seconds
max_output_size=8192, # Max output size
working_dir="/path/to/dir", # Working directory (optional)
allowed_commands=["echo", "ls", "pwd"], # Command allowlist (optional)
)
# Register in a registry
registry = ToolRegistry()
registry.register(shell_tool)
# Execute directly
result = shell_tool.execute(command="echo Hello World")
print(result.success) # True
print(result.output) # "Hello World"
# Or via the registry
from tools import ToolCall
call = ToolCall(name="shell", arguments={"command": "ls -la"})
result = registry.execute(call)
```
#### FunctionTool β calling custom functions
```python
from tools import FunctionTool, ToolRegistry
# Create a FunctionTool
func_tool = FunctionTool()
# Register functions via decorator
@func_tool.register
def calculate(expression: str) -> str:
"""Evaluate a math expression."""
return str(eval(expression))
@func_tool.register
def uppercase(text: str) -> str:
"""Convert text to uppercase."""
return text.upper()
@func_tool.register(name="word_count", description="Count words in text")
def count_words(text: str) -> int:
"""Count words."""
return len(text.split())
# Register in the registry
registry = ToolRegistry()
registry.register(func_tool)
# Call a function
result = func_tool.execute(function="calculate", expression="2 ** 10")
print(result.output) # "1024"
# List registered functions
print(func_tool.list_functions()) # ['calculate', 'uppercase', 'word_count']
```
#### Two ways to register functions
There are two ways to register functions as tools:
**Method 1: Via FunctionTool (grouped functions)**
Functions are grouped under a single tool named `function_calling`. The LLM must call them like this:
```json
{"name": "function_calling", "arguments": {"function": "calculate", "expression": "2+2"}}
```
```python
func_tool = FunctionTool()
@func_tool.register
def calculate(expression: str) -> str:
return str(eval(expression))
registry.register(func_tool)
```
**Method 2: Via `@registry.function` (separate tools) β RECOMMENDED**
Each function becomes a separate tool. The LLM calls them directly:
```json
{"name": "calculate", "arguments": {"expression": "2+2"}}
```
```python
@registry.function
def calculate(expression: str) -> str:
return str(eval(expression))
@registry.function
def fibonacci(n: int) -> str:
"""Calculate the n-th Fibonacci number."""
a, b = 0, 1
for _ in range(n):
a, b = b, a + b
return str(a)
```
**Recommendation:** Use `@registry.function` β it is simpler for the LLM and avoids confusion with nested arguments.
#### CodeInterpreterTool β executing Python code
Allows agents to execute arbitrary Python code in a safe sandbox environment.
```python
from tools import CodeInterpreterTool, ToolRegistry, ToolCall
# Create a CodeInterpreterTool
code_tool = CodeInterpreterTool(
timeout=30, # Execution timeout in seconds
max_output_size=8192, # Maximum output size
safe_mode=True, # Restricted builtins for safety
)
# Register
registry = ToolRegistry()
registry.register(code_tool)
# Example 1: Simple computation
result = code_tool.execute(code="2 ** 10 + sum(range(5))")
print(result.output) # "1034"
# Example 2: Multi-line code with functions
code = """
def fibonacci(n):
a, b = 0, 1
for _ in range(n):
a, b = b, a + b
return a
for i in range(10):
print(f"fib({i}) = {fibonacci(i)}")
"""
result = code_tool.execute(code=code)
print(result.output)
# fib(0) = 0
# fib(1) = 1
# fib(2) = 1
# ...
# Example 3: Using preloaded modules
# Available in sandbox: math, statistics, json, re, datetime,
# collections, itertools, functools, random
result = code_tool.execute(code="""
# Modules are already loaded; no import needed
print(f"pi = {math.pi:.6f}")
print(f"e = {math.e:.6f}")
data = {"name": "Alice", "age": 30}
print(json.dumps(data, indent=2))
""")
print(result.output)
# Example 4: Error handling
result = code_tool.execute(code="1 / 0")
print(result.success) # False
print(result.error) # "ZeroDivisionError: division by zero"
```
**Safety:**
- With `safe_mode=True`, built-in functions are restricted
- Forbidden: `open`, `exec`, `eval`, `__import__`, `compile`
- Only safe modules are available
- Timeout prevents infinite loops
#### FileSearchTool β searching files and contents
Allows agents to search files by name, search text within files, and read file contents.
```python
from tools import FileSearchTool, ToolRegistry, ToolCall
# Create a FileSearchTool
file_tool = FileSearchTool(
base_directory="./project", # Base directory to search within
max_results=50, # Maximum number of results
max_depth=10, # Maximum recursion depth
max_file_size=100_000, # Max file size for content search
max_read_size=10_000, # Max size for reading a file
allowed_extensions=[".py", ".txt", ".md"], # Allowed extensions (optional)
)
registry = ToolRegistry()
registry.register(file_tool)
# Example 1: Find all Python files
result = file_tool.execute(pattern="*.py")
print(result.output)
# Found 15 file(s) matching '*.py':
# src/main.py (1,234 bytes)
# src/utils.py (567 bytes)
# ...
# Example 2: Search in a specific directory
result = file_tool.execute(pattern="test_*.py", directory="tests")
print(result.output)
# Example 3: Search within file contents
result = file_tool.execute(pattern="*.py", query="def main")
print(result.output)
# Search results for 'def main' in 15 file(s):
# Found 3 match(es).
# === src/main.py ===
# 42: def main():
# === src/cli.py ===
# 15: def main_entry():
# ...
# Example 4: Regex search
result = file_tool.execute(pattern="*.py", query=r"def \w+_handler", regex=True)
# Example 5: Read a specific file
result = file_tool.execute(read_file="src/config.py")
print(result.output)
# === src/config.py ===
# """Configuration module."""
# import os
# ...
# Example 6: Via ToolCall (how the LLM calls it)
call = ToolCall(
name="file_search",
arguments={"pattern": "*.py", "query": "class Agent"}
)
result = registry.execute(call)
```
**Safety:**
- Cannot escape outside `base_directory`
- Hidden files and directories (starting with `.`) are skipped
- File size limits prevent reading huge files
#### WebSearchTool β searching, reading, and interacting with web pages
A tool for working with the internet: search (DuckDuckGo/Serper/Tavily), fetching pages, and full interaction via Selenium (clicks, forms, JS, crawl).
> **Install Selenium** (optional):
> ```bash
> pip install selenium webdriver-manager
> ```
##### Quick start
**Method 1 β dict config (recommended):**
```python
from builder import GraphBuilder
from execution import MACPRunner
builder = GraphBuilder()
builder.add_agent(
"researcher",
persona="research assistant",
# Dict config β tool is created automatically with the desired parameters
tools=[{"name": "web_search", "use_selenium": True, "fetch_content": True}],
)
builder.add_task(query="Find information about Python 3.12")
builder.connect_task_to_agents(agent_ids=["researcher"])
graph = builder.build()
runner = MACPRunner(llm_caller=my_caller)
result = runner.run_round(graph)
```
**Method 2 β registry registration:**
```python
from tools import WebSearchTool, get_registry
registry = get_registry()
registry.register(WebSearchTool(use_selenium=True, fetch_content=True))
# Agent references it by name
builder.add_agent("researcher", tools=["web_search"])
```
**Method 3 β pass the object directly:**
```python
from tools import WebSearchTool
builder.add_agent(
"researcher",
tools=[WebSearchTool(use_selenium=True)],
)
```
##### Dict config parameters
```python
tools=[{
"name": "web_search",
# All WebSearchTool constructor parameters:
"use_selenium": True,
"fetch_content": True,
"max_results": 5,
"timeout": 15,
"max_content_length": 4000,
"selenium_config": {
"headless": True,
"browser": "edge", # "chrome", "firefox", "edge"
"extra_wait": 1.0,
"disable_images": True,
"page_load_timeout": 30,
},
# Provider by string:
# "provider": "serper", # "duckduckgo", "serper", "tavily"
# "api_key": "...",
}]
```
The browser is detected automatically. If `webdriver-manager` cannot download a driver (no internet, SSL error), a system driver is used.
##### Actions (the `action` parameter)
`action` is a command that defines what to do. All actions run within the same browser session.
| action | Description | Required parameters |
|--------|-------------|---------------------|
| `search` | Web search | `query` |
| `fetch` | Open and read a page | `url` |
| `click` | Click an element | `selector` |
| `fill` | Fill an input | `selector`, `value` |
| `extract_links` | Extract links from a page | β |
| `execute_js` | Execute JavaScript | `js_code` |
| `crawl` | Recursive site crawl | `url` |
| `get_content` | Text of the current page | β |
`search` and `fetch` work without Selenium. The rest require `use_selenium=True`.
If `action` is not provided, it is inferred automatically: `query` β search, `url` β fetch, `selector` β click, `js_code` β execute_js.
##### Action examples
```python
from tools import WebSearchTool
with WebSearchTool(use_selenium=True) as tool:
# Search
result = tool.execute(action="search", query="Python tutorials")
# Fetch a page (wait for an element)
result = tool.execute(action="fetch", url="https://example.com", wait_for_selector="h1")
# Click
result = tool.execute(action="click", selector="a.nav-link")
# Fill a form and submit
result = tool.execute(action="fill", selector="input[name=q]", value="Python", submit=True)
# Extract links
result = tool.execute(action="extract_links", url="https://example.com")
# Execute JS
result = tool.execute(action="execute_js", js_code="return document.title")
# Crawl
result = tool.execute(action="crawl", url="https://docs.python.org", max_depth=2, max_pages=5)
# Current page text
result = tool.execute(action="get_content")
```
##### Search providers
| Provider | API key | Description |
|----------|---------|-------------|
| `DuckDuckGoProvider` | No | Default, free |
| `SerperProvider` | Yes (serper.dev) | Google Search |
| `TavilyProvider` | Yes (tavily.com) | With AI summarization |
```python
# Via dict config
tools=[{"name": "web_search", "provider": "tavily", "api_key": "tvly-..."}]
# Or directly
from tools import WebSearchTool, TavilyProvider
tool = WebSearchTool(provider=TavilyProvider(api_key="tvly-..."))
```
Custom provider:
```python
from tools import WebSearchTool, SearchProvider
class MyProvider(SearchProvider):
def search(self, query: str, max_results: int = 5) -> list[dict[str, str]]:
return [{"title": "Result", "url": "https://example.com", "snippet": query}]
tool = WebSearchTool(provider=MyProvider())
```
##### Constructor parameters
| Parameter | Type | Default | Description |
|----------|------|---------|-------------|
| `provider` | `SearchProvider \| None` | `DuckDuckGoProvider` | Search provider |
| `max_results` | `int` | `5` | Max search results |
| `max_content_length` | `int` | `4000` | Max page content length |
| `fetch_content` | `bool` | `False` | Fetch page contents during search |
| `timeout` | `int` | `15` | Request timeout (sec) |
| `use_selenium` | `bool` | `False` | Use Selenium |
| `selenium_config` | `dict \| None` | `None` | Selenium settings (headless, browser, extra_wait, etc.) |
| `selenium_fetcher` | `SeleniumFetcher \| None` | `None` | A pre-built SeleniumFetcher instance |
| `callback_manager` | `CallbackManager \| None` | `None` | For events (if None β taken from context) |
##### execute() parameters
| Parameter | Type | Description |
|----------|------|-------------|
| `action` | `str` | Action (see table above). Auto-inferred if omitted |
| `query` | `str` | Search query |
| `url` | `str` | Page URL |
| `selector` | `str` | CSS selector |
| `value` | `str` | Value for fill |
| `submit` | `bool` | Submit the form (default: False) |
| `js_code` | `str` | JavaScript code |
| `max_pages` | `int` | Max pages for crawl (default: 10) |
| `max_depth` | `int` | Max crawl depth (default: 2) |
| `url_filter` | `str` | Regex filter for crawl URLs |
| `fetch_content` | `bool` | Fetch contents (for search) |
| `max_results` | `int` | Max results (for search) |
| `wait_for_selector` | `str` | CSS selector to wait for page readiness |
##### Callback integration
WebSearchTool emits `on_tool_start`/`on_tool_end`/`on_tool_error` events via the callback system:
```python
from callbacks import CallbackManager, StdoutCallbackHandler
from tools import WebSearchTool
cb = CallbackManager(handlers=[StdoutCallbackHandler()])
tool = WebSearchTool(callback_manager=cb, use_selenium=True)
tool.execute(action="fetch", url="https://example.com")
# π οΈ Tool 'web_search.fetch' started
# β
Tool 'web_search.fetch' ended (1200ms)
```
##### Notes
- Two modes: `urllib` (no dependencies) and Selenium (full browser)
- Browsers: Chrome, Firefox, Edge (automatic fallback to system driver)
- Context manager: `with WebSearchTool(...) as tool:` β auto-closes the browser
- Built-in HTML parser without external dependencies
- `create_tool_from_config()` β build from dict config for agent integration
#### ToolRegistry β tool registry
```python
from tools import ToolRegistry, ShellTool, FunctionTool
# Create a registry
registry = ToolRegistry()
# Register tools
registry.register(ShellTool(timeout=10))
registry.register(FunctionTool())
# Register functions via the registry decorator (convenient)
@registry.function
def greet(name: str) -> str:
"""Greeting."""
return f"Hello, {name}!"
@registry.function(name="add", description="Add two numbers")
def add_numbers(a: int, b: int) -> int:
return a + b
# Check tool presence
print(registry.has("shell")) # True
print(registry.has("greet")) # True
# List tools
print(registry.list_tools()) # ['shell', 'function_calling', 'greet', 'add']
# Get tools for an agent
tools = registry.get_tools_for_agent(["shell", "greet"])
print([t.name for t in tools]) # ['shell', 'greet']
# Format a prompt with tool descriptions
prompt = registry.format_tools_prompt(["shell", "greet"])
print(prompt)
# Available tools:
# - shell: Execute a shell command...
# - greet: Greeting.
# To use a tool, format your response as:
# <tool_call>{"name": "tool_name", "arguments": {...}}</tool_call>
```
#### Parsing tool_call from an LLM response
An agent can call a tool by including a special tag in its response:
```python
from tools import ToolCall
# LLM returns a response with tool calls
llm_response = """
I need to compute the result.
<tool_call>
{"name": "calculate", "arguments": {"expression": "2 + 2"}}
</tool_call>
And also check the directory:
<tool_call>
{"name": "shell", "arguments": {"command": "ls"}}
</tool_call>
"""
# Parse all calls
calls = ToolCall.parse_from_response(llm_response)
print(len(calls)) # 2
print(calls[0].name) # "calculate"
print(calls[0].arguments) # {"expression": "2 + 2"}
# Execute all calls
results = registry.execute_all(calls)
for result in results:
print(f"{result.tool_name}: {result.output if result.success else result.error}")
```
#### Integration with MACPRunner
Tools are used **automatically** β it is enough to specify them when creating the agent.
```python
from execution import MACPRunner, RunnerConfig
from builder import GraphBuilder
from tools import (
tool, get_registry, register_tool,
ShellTool, CodeInterpreterTool, FileSearchTool,
OpenAIToolsCaller,
)
from openai import OpenAI
# 1. Register built-in tools
register_tool(ShellTool(timeout=10))
register_tool(CodeInterpreterTool(timeout=10, safe_mode=True))
register_tool(FileSearchTool(base_directory="."))
# Register custom functions via @tool
@tool
def get_current_time() -> str:
"""Get current date and time."""
from datetime import datetime
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
@tool
def calculate(expression: str) -> str:
"""Evaluate math expression safely."""
return str(eval(expression, {"__builtins__": {}}, {}))
# 2. Create a graph with agents
builder = GraphBuilder()
builder.add_agent(
"assistant",
display_name="AI Assistant",
persona="Helpful assistant who uses tools to solve problems.",
tools=["shell", "get_current_time"], # <-- tools are used automatically!
)
builder.add_agent(
"coder",
display_name="Python Coder",
persona="Python expert who writes and executes code.",
tools=["code_interpreter"],
)
builder.add_agent(
"calculator",
display_name="Calculator Agent",
persona="Math expert who calculates expressions.",
tools=["calculate"],
)
builder.add_workflow_edge("assistant", "calculator")
builder.add_task(query="What is 25 * 17 and what time is it?")
builder.connect_task_to_agents()
graph = builder.build()
# 3. Create caller and runner
client = OpenAI(api_key="...")
caller = OpenAIToolsCaller(client, model="gpt-4")
runner = MACPRunner(llm_caller=caller) # No extra configuration needed!
# 4. Execute β tools are used automatically
result = runner.run_round(graph)
print(result.final_answer)
```
**Note:** The `max_tool_iterations` parameter in `RunnerConfig` limits the number of tool-calling loops (default is 3).
#### Creating a custom tool
```python
from tools import BaseTool, ToolResult
from typing import Any
class WeatherTool(BaseTool):
"""A tool for getting weather."""
@property
def name(self) -> str:
return "weather"
@property
def description(self) -> str:
return "Get current weather for a city"
@property
def parameters_schema(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "City name"
}
},
"required": ["city"]
}
def execute(self, city: str = "", **kwargs) -> ToolResult:
if not city:
return ToolResult(
tool_name=self.name,
success=False,
error="City is required"
)
# A real API call would go here
weather = f"Sunny, 22Β°C in {city}"
return ToolResult(
tool_name=self.name,
success=True,
output=weather
)
# Usage
registry = ToolRegistry()
registry.register(WeatherTool())
result = registry.execute(ToolCall(name="weather", arguments={"city": "Moscow"}))
print(result.output) # "Sunny, 22Β°C in Moscow"
```
#### Example: full workflow with tools
```python
"""Full example of using tools in a multi-agent system."""
import math
from execution import MACPRunner, RunnerConfig
from builder import GraphBuilder
from tools import (
ToolRegistry,
ShellTool,
CodeInterpreterTool,
FileSearchTool,
)
# Configure tools
registry = ToolRegistry()
# Shell with allowlist
registry.register(ShellTool(
timeout=5,
allowed_commands=["echo", "date", "pwd", "ls"]
))
# Code interpreter to execute Python code
registry.register(CodeInterpreterTool(timeout=10, safe_mode=True))
# File search to find files
registry.register(FileSearchTool(base_directory=".", max_results=20))
# Math functions β register directly via @registry.function
# This allows the LLM to call them by name: {"name": "sqrt", "arguments": {"x": 144}}
@registry.function
def sqrt(x: float) -> float:
"""Calculate square root."""
return math.sqrt(x)
@registry.function
def power(base: float, exp: float) -> float:
"""Calculate base^exp."""
return math.pow(base, exp)
@registry.function
def factorial(n: int) -> int:
"""Calculate factorial."""
return math.factorial(n)
# Build the graph
builder = GraphBuilder()
builder.add_agent(
"math_solver",
persona="Expert mathematician",
tools=["sqrt", "power", "factorial"], # Direct access to functions
)
builder.add_agent(
"coder",
persona="Python developer",
tools=["code_interpreter"], # Execute Python code
)
builder.add_agent(
"researcher",
persona="Code researcher",
tools=["file_search"], # Search files
)
builder.add_agent(
"coordinator",
persona="Task coordinator that combines results",
tools=[], # No tools
)
builder.add_workflow_edge("math_solver", "coordinator")
builder.add_workflow_edge("coder", "coordinator")
builder.add_workflow_edge("researcher", "coordinator")
builder.add_task(query="Calculate sqrt(144), then write Python to verify")
builder.connect_task_to_agents()
graph = builder.build()
# Execute
def mock_llm(prompt: str) -> str:
if "mathematician" in prompt:
return '''I'll calculate the square root.
<tool_call>
{"name": "sqrt", "arguments": {"x": 144}}
</tool_call>
'''
elif "developer" in prompt:
return '''Let me verify with Python code.
<tool_call>
{"name": "code_interpreter", "arguments": {"code": "import math\\nprint(f'sqrt(144) = {math.sqrt(144)}')"}}
</tool_call>
'''
elif "researcher" in prompt:
return '''Let me find Python files.
<tool_call>
{"name": "file_search", "arguments": {"pattern": "*.py", "directory": "src"}}
</tool_call>
'''
else:
return "Based on the results: sqrt(144) = 12 and we're in the current directory."
config = RunnerConfig(enable_tools=True, max_tool_iterations=2)
runner = MACPRunner(llm_caller=mock_llm, tool_registry=registry, config=config)
result = runner.run_round(graph)
print("Final:", result.final_answer)
```
#### Running the example
```bash
# Run the tools example
uv run python examples/tools_example.py
# Run tests
uv run pytest tests/test_tools.py -v
```
---
## API Reference
### Core classes
| Class | Description | Pydantic |
|-------|-------------|----------|
| `RoleGraph` | Role/agent graph with adjacency matrices | β |
| `AgentProfile` | **Pydantic BaseModel** β Immutable agent profile | β
|
| `TaskNode` | **Pydantic BaseModel** β Virtual task node | β
|
| `NodeEncoder` | Text-to-embeddings encoder | β |
| `MACPRunner` | MACP protocol executor | β |
| `AdaptiveScheduler` | Adaptive scheduler | β |
| `LLMCallerFactory` | Factory for creating LLM callers (multi-model) | β |
| `LLMConfig` | **Pydantic BaseModel** β LLM configuration for schemas | β
|
| `AgentLLMConfig` | **Pydantic BaseModel** β LLM configuration for AgentProfile | β
|
| `AgentMemory` | Agent memory manager | β |
| `SharedMemoryPool` | Shared memory pool | β |
| `BudgetTracker` | Token/request budget tracker | β |
| `MetricsTracker` | Performance metrics tracker | β |
| `GraphVisualizer` | Graph visualization | β |
| `BaseCallbackHandler` | Base callback handler | β |
| `AsyncCallbackHandler` | Async callback handler | β |
| `CallbackManager` | Callback handlers manager | β |
| `AsyncCallbackManager` | Async callbacks manager | β |
| `StdoutCallbackHandler` | Console event output | β |
| `MetricsCallbackHandler` | Execution metrics aggregation | β |
| `FileCallbackHandler` | Write events to JSON Lines file | β |
| `EventBus` | Event bus for graph monitoring | β |
| `EarlyStopCondition` | Early stopping condition | β |
| `StepContext` | **Pydantic BaseModel** β Step context for hooks | β
|
| `TopologyAction` | **Pydantic BaseModel** β Topology modification action | β
|
### Schemas (Pydantic BaseModel)
| Schema class | Description | Usage |
|-------------|-------------|-------|
| `GraphSchema` | **Pydantic** β Full graph schema | Validation, serialization, migration |
| `BaseNodeSchema` | **Pydantic** β Base node schema | Parent class for nodes |
| `AgentNodeSchema` | **Pydantic** β Agent node schema | LLM config, tools, metrics, embeddings |
| `TaskNodeSchema` | **Pydantic** β Task node schema | Query, status, deadline |
| `BaseEdgeSchema` | **Pydantic** β Base edge schema | Weight, probability, cost |
| `WorkflowEdgeSchema` | **Pydantic** β Workflow edge | Conditions, priority, transforms |
| `CostMetrics` | **Pydantic** β Cost metrics | Tokens, latency, trust, reliability |
| `ValidationResult` | **Pydantic** β Validation result | Errors, warnings |
### Visualization (Pydantic BaseModel)
| Class | Description | Usage |
|-------|-------------|-------|
| `VisualizationStyle` | **Pydantic** β Global visualization style | Configure colors, shapes, what to show |
| `NodeStyle` | **Pydantic** β Node style | Shape, fill_color, stroke_color, icon |
| `EdgeStyle` | **Pydantic** β Edge style | Line style, arrow, colors |
| `NodeShape` | Enum β Node shapes | RECTANGLE, ROUND, STADIUM, CIRCLE, DIAMOND, etc. |
| `MermaidDirection` | Enum β Graph direction | TOP_BOTTOM, LEFT_RIGHT, etc. |
### GNN (Pydantic BaseModel)
| Class | Description | Usage |
|-------|-------------|-------|
| `FeatureConfig` | **Pydantic** β Feature configuration | Node/edge feature dimensions |
| `TrainingConfig` | **Pydantic** β Training configuration | Learning rate, epochs, optimizer |
### Graph construction functions
| Function | Description |
|---------|-------------|
| `build_property_graph()` | Main graph builder |
| `build_from_schema()` | Build from GraphSchema |
| `build_from_adjacency()` | Build from adjacency matrix |
| `GraphBuilder` | Fluent graph builder with multi-model support |
### Multi-model functions
| Function | Description |
|---------|-------------|
| `create_openai_caller()` | Create a legacy flat-string `(str) -> str` LLM caller |
| `create_openai_structured_caller()` | Create a sync structured caller `(list[dict]) -> str` β **recommended** |
| `create_openai_async_structured_caller()` | Create an async structured caller β required for `astream()` with `enable_parallel=True` |
| `LLMCallerFactory.create_openai_factory()` | Create a factory for automatic caller generation |
| `LLMConfig.merge_with()` | Merge LLM configurations (fallback) |
| `AgentProfile.with_llm_config()` | Set LLM configuration for an agent |
| `AgentProfile.has_custom_llm()` | Check whether an agent has a custom LLM config |
### Scheduling functions
| Function | Description |
|---------|-------------|
| `build_execution_order()` | Topological execution order |
| `get_parallel_groups()` | Parallel execution groups |
| `extract_agent_adjacency()` | Extract the agent adjacency matrix |
| `get_incoming_agents()` | Agent predecessors |
| `get_outgoing_agents()` | Agent successors |
### Configuration classes
| Class | Description |
|------|-------------|
| `RunnerConfig` | MACPRunner configuration |
| `LLMConfig` | LLM configuration for an agent (multi-model) |
| `AgentLLMConfig` | Immutable LLM configuration for AgentProfile |
| `RoutingPolicy` | Routing policies |
| `PruningConfig` | Agent pruning configuration |
| `MemoryConfig` | Memory system configuration |
| `TrainingConfig` | GNN training configuration |
| `ErrorPolicy` | Error-handling policies |
| `FrameworkSettings` | Global framework settings |
---
## FAQ
### Why Pydantic? What benefits does it provide?
gMAS Framework is built entirely on **Pydantic 2.0+** to ensure type safety, automatic validation, and convenient serialization. Key benefits:
1. **Automatic type validation** β errors are caught when objects are created, not later at runtime
2. **Declarative typing** β IDE autocompletion, static checking (mypy, pyright)
3. **Automatic serialization** β `.model_dump()`, `.model_dump_json()` work out of the box
4. **Default values** β no need to write boilerplate
5. **Nested models** β automatic validation of nested structures
6. **Migrations** β safe schema upgrades between versions
7. **Immutability** β `frozen=True` prevents accidental mutation
```python
from core import AgentProfile
from pydantic import ValidationError
# β
Correct usage β Pydantic validates
agent = AgentProfile(
agent_id="test",
display_name="Test Agent",
tools=["tool1", "tool2"],
)
# β Incorrect β Pydantic will raise ValidationError
try:
bad_agent = AgentProfile(
agent_id=123, # Must be str, not int
display_name="Test",
)
except ValidationError as e:
print(e.errors()) # Detailed error info
# Automatic serialization (Pydantic v2 API)
data = agent.model_dump() # β dict
json_str = agent.model_dump_json(indent=2) # β JSON string
# Automatic deserialization
loaded = AgentProfile.model_validate(data)
from_json = AgentProfile.model_validate_json(json_str)
```
### Which Pydantic version is required? Is it compatible with Pydantic 1.x?
**gMAS Framework requires Pydantic 2.0+ and is not compatible with Pydantic 1.x.**
Key API differences:
- Pydantic 1.x: `.dict()`, `.parse_obj()`, `.json()`
- Pydantic 2.x: `.model_dump()`, `.model_validate()`, `.model_dump_json()`
If you have Pydantic 1.x installed:
```bash
pip install --upgrade "pydantic>=2.0"
```
Version check:
```python
import pydantic
print(pydantic.VERSION) # Must be >= 2.0.0
```
### How do I use different models for different agents?
```python
from builder import GraphBuilder
from execution import MACPRunner, LLMCallerFactory
# Method 1: Via GraphBuilder (recommended)
builder = GraphBuilder()
builder.add_agent(
"analyst",
llm_backbone="gpt-4", # Strong model
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
temperature=0.0,
max_tokens=4000,
)
builder.add_agent(
"formatter",
llm_backbone="gpt-4o-mini", # Cheaper model
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
temperature=0.3,
max_tokens=1000,
)
builder.add_workflow_edge("analyst", "formatter")
graph = builder.build()
# Factory auto-creates callers
factory = LLMCallerFactory.create_openai_factory()
runner = MACPRunner(llm_factory=factory)
result = runner.run_round(graph)
```
### How do I integrate with OpenAI?
```python
import openai
# Method 1: Simple integration (one LLM for all)
def openai_caller(prompt: str) -> str:
response = openai.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
)
return response.choices[0].message.content
runner = MACPRunner(llm_caller=openai_caller)
# Method 2: Multi-model integration (recommended)
from execution import create_openai_caller
# Uses the openai SDK automatically
runner = MACPRunner(
llm_factory=LLMCallerFactory.create_openai_factory(
default_api_key="sk-...",
default_base_url="https://api.openai.com/v1",
)
)
```
### How do I use local models (Ollama)?
```python
import requests
def ollama_caller(prompt: str) -> str:
response = requests.post(
"http://localhost:11434/api/generate",
json={"model": "llama2", "prompt": prompt, "stream": False},
)
return response.json()["response"]
runner = MACPRunner(llm_caller=ollama_caller)
```
### How do I add custom tools?
Tools are just strings that are included in the agent prompt:
```python
agent = AgentProfile(
agent_id="code_executor",
display_name="Code Executor",
tools=["python_execute", "file_read", "file_write"],
)
```
Tool logic is implemented inside your LLM call.
### How do I visualize the graph? Which formats are supported?
gMAS Framework provides a powerful visualization system with **Pydantic styles** and support for multiple formats:
**Supported formats:**
1. **Mermaid** β for GitHub/docs
2. **ASCII art** β for terminals
3. **Graphviz DOT** β for professional visualization
4. **Rich Console** β colored terminal output
5. **PNG/SVG/PDF** β image rendering (requires system Graphviz)
```python
from core.visualization import (
GraphVisualizer,
VisualizationStyle,
NodeStyle,
NodeShape,
MermaidDirection,
# Convenience functions
to_mermaid,
to_ascii,
print_graph,
render_to_image,
)
# Quick visualization (convenience functions)
print(to_mermaid(graph, direction=MermaidDirection.LEFT_RIGHT))
print(to_ascii(graph, show_edges=True))
print_graph(graph, format="auto") # Auto-selects colored/ascii
# Advanced custom styles (Pydantic models)
style = VisualizationStyle(
direction=MermaidDirection.LEFT_RIGHT,
agent_style=NodeStyle(
shape=NodeShape.ROUND,
fill_color="#e3f2fd",
stroke_color="#1976d2",
icon="π€",
),
show_weights=True,
show_tools=True,
)
viz = GraphVisualizer(graph, style)
viz.save_mermaid("graph.md", title="My Workflow")
viz.save_dot("graph.dot")
# Image rendering (requires: pip install graphviz + system graphviz)
try:
render_to_image(graph, "output.png", format="png", dpi=150, style=style)
render_to_image(graph, "output.svg", format="svg", style=style)
print("β
Images created")
except Exception as e:
print(f"β οΈ Install system Graphviz: {e}")
# Ubuntu: sudo apt install graphviz
# macOS: brew install graphviz
```
**Installing Graphviz for image rendering:**
```bash
# Python library
pip install graphviz
# System Graphviz
# Ubuntu/Debian:
sudo apt install graphviz
# macOS:
brew install graphviz
# Windows:
winget install graphviz
```
### How do I save and load a graph?
```python
import json
# Save
data = graph.to_dict()
with open("graph.json", "w") as f:
json.dump(data, f)
# Load
with open("graph.json", "r") as f:
data = json.load(f)
graph = RoleGraph.from_dict(data)
```
**Saving via Pydantic schemas (recommended):**
```python
from core.schema import GraphSchema
# Build a schema from the graph
schema = GraphSchema(
name="MyGraph",
nodes={agent.agent_id: AgentNodeSchema.from_profile(agent) for agent in graph.agents},
edges=[BaseEdgeSchema.from_edge(e) for e in graph.edges],
)
# Save (Pydantic auto-serialization)
schema_json = schema.model_dump_json(indent=2)
with open("graph_schema.json", "w") as f:
f.write(schema_json)
# Load (Pydantic auto-validation)
with open("graph_schema.json", "r") as f:
loaded_schema = GraphSchema.model_validate_json(f.read())
# Build a graph from the schema
from builder import build_from_schema
graph = build_from_schema(loaded_schema)
```
### How do I handle agent errors?
```python
from execution import RunnerConfig, ErrorPolicy
config = RunnerConfig(
error_policy=ErrorPolicy(
on_error="fallback", # skip, retry, fallback, fail
max_retries=3,
),
pruning_config=PruningConfig(
enable_fallback=True,
max_fallback_attempts=2,
),
)
result = runner.run_round(graph)
if result.errors:
for error in result.errors:
print(f"Error in {error.agent_id}: {error.message}")
```
### How do I track agent performance?
```python
from core.metrics import MetricsTracker
tracker = MetricsTracker()
# Runner integration
runner = MACPRunner(llm_caller=my_llm, metrics_tracker=tracker)
result = runner.run_round(graph)
# Retrieve metrics
for agent_id in graph.node_ids:
metrics = tracker.get_node_metrics(agent_id)
print(f"{agent_id}:")
print(f" Reliability: {metrics.reliability:.2%}")
print(f" Avg latency: {metrics.avg_latency_ms:.0f}ms")
print(f" Quality: {metrics.avg_quality:.2f}")
# Save metrics
tracker.save("metrics.json")
```
### How do I use dynamic topology?
```python
# Modify the graph at runtime
graph.add_node(new_agent, connections_to=["existing_agent"])
graph.add_edge("agent1", "new_agent", weight=0.8)
# Remove inefficient agents
if metrics.get_node_metrics("slow_agent").avg_latency_ms > 5000:
graph.remove_node("slow_agent", policy=StateMigrationPolicy.DISCARD)
# Update weights based on performance
new_weights = compute_weights_from_metrics(tracker)
graph.update_communication(new_weights)
```
### How do I integrate with LangChain?
```python
from langchain.chat_models import ChatOpenAI
from langchain.schema import HumanMessage
llm = ChatOpenAI(model="gpt-4")
def langchain_caller(prompt: str) -> str:
messages = [HumanMessage(content=prompt)]
response = llm(messages)
return response.content
runner = MACPRunner(llm_caller=langchain_caller)
result = runner.run_round(graph)
```
### How do I implement human-in-the-loop?
```python
from execution import StreamEventType
def human_approval(agent_id: str, response: str) -> bool:
print(f"\n{agent_id} replied: {response}")
approval = input("Approve? (y/n): ")
return approval.lower() == 'y'
def stream_with_approval(graph):
for event in runner.stream(graph):
if event.event_type == StreamEventType.AGENT_OUTPUT:
if not human_approval(event.agent_id, event.content):
# Restart the agent with feedback
feedback = input("Your feedback: ")
# ... restart logic ...
yield event
```
### How do I use a graph with multiple tasks?
```python
# Option 1: sequential
queries = ["Task 1", "Task 2", "Task 3"]
for query in queries:
graph.query = query
result = runner.run_round(graph)
print(f"{query}: {result.final_answer}")
# Option 2: parallel (async)
async def process_queries(queries):
tasks = []
for query in queries:
graph_copy = copy.deepcopy(graph)
graph_copy.query = query
tasks.append(runner.arun_round(graph_copy))
results = await asyncio.gather(*tasks)
return results
```
### How do I combine cloud and local models?
```python
from builder import GraphBuilder
builder = GraphBuilder()
# Cloud model for public data
builder.add_agent(
"public_analyzer",
llm_backbone="gpt-4",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
)
# Local model (Ollama) for confidential data
builder.add_agent(
"private_analyzer",
llm_backbone="llama3:70b",
base_url="http://localhost:11434/v1",
api_key="not-needed", # Ollama does not require an API key
)
builder.add_workflow_edge("public_analyzer", "private_analyzer")
graph = builder.build()
factory = LLMCallerFactory.create_openai_factory()
runner = MACPRunner(llm_factory=factory)
```
### How do I optimize LLM cost with multi-model routing?
```python
# Strategy: cheap models for routine tasks, expensive for complex tasks
builder = GraphBuilder()
# Steps 1-3: simple operations β cheap model
for i in range(3):
builder.add_agent(
f"processor_{i}",
llm_backbone="gpt-4o-mini", # $0.15/$0.60 per 1M tokens
max_tokens=500,
)
# Step 4: complex analysis β expensive model
builder.add_agent(
"analyst",
llm_backbone="gpt-4", # $30/$60 per 1M tokens
max_tokens=2000,
)
# Step 5: final formatting β cheap model
builder.add_agent(
"formatter",
llm_backbone="gpt-4o-mini",
max_tokens=500,
)
# Savings: ~70β80% vs using gpt-4 for all steps
```
### How do I use API keys safely?
```python
# β DO NOT do this (hardcode keys)
builder.add_agent("agent", api_key="sk-1234567890...")
# β
Correct: use environment variables
import os
# Method 1: load from a .env file
from dotenv import load_dotenv
load_dotenv()
builder.add_agent("agent", api_key="$OPENAI_API_KEY")
# Method 2: set the env var explicitly
os.environ["OPENAI_API_KEY"] = open("keys/openai.key").read().strip()
builder.add_agent("agent", api_key="$OPENAI_API_KEY")
# Method 3: use a factory with a default key
factory = LLMCallerFactory.create_openai_factory(
default_api_key=os.getenv("OPENAI_API_KEY"),
)
```
### How do I configure logging?
```python
from config import setup_logging
# Configure global logging
setup_logging(
level="DEBUG",
log_file="framework.log",
rotation="500 MB",
retention="10 days",
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
backtrace=True,
diagnose=True,
)
# Use in code
from config import logger
logger.info("Starting execution")
logger.debug(f"Graph has {graph.num_nodes} nodes")
logger.error("Failed to execute agent", exc_info=True)
```
### How do I export a graph for analysis?
```python
# 1. JSON serialization
import json
graph_data = graph.to_dict()
with open("graph.json", "w") as f:
json.dump(graph_data, f, indent=2)
# 2. PyTorch Geometric format
pyg_data = graph.to_pyg_data()
torch.save(pyg_data, "graph.pt")
# 3. NetworkX format (if needed)
import networkx as nx
G = nx.DiGraph()
for node_id in graph.node_ids:
G.add_node(node_id, **graph.get_agent_by_id(node_id).to_dict())
for i, j in zip(*graph.edge_index):
src = graph.node_ids[i]
tgt = graph.node_ids[j]
G.add_edge(src, tgt, weight=graph.A_com[i, j])
nx.write_gexf(G, "graph.gexf")
# 4. CSV export
import pandas as pd
# Nodes
nodes_df = pd.DataFrame([
{"id": agent.agent_id, "name": agent.display_name, "tools": ",".join(agent.tools)}
for agent in graph.agents
])
nodes_df.to_csv("nodes.csv", index=False)
# Edges
edges = []
for i in range(graph.num_nodes):
for j in range(graph.num_nodes):
if graph.A_com[i, j] > 0:
edges.append({
"source": graph.node_ids[i],
"target": graph.node_ids[j],
"weight": graph.A_com[i, j],
})
edges_df = pd.DataFrame(edges)
edges_df.to_csv("edges.csv", index=False)
```
### How do I test agents?
```python
import pytest
from unittest.mock import Mock
def test_agent_execution():
# Mock the LLM
mock_llm = Mock(return_value="Mocked response")
# Build a graph
agents = [AgentProfile(agent_id="test", display_name="Test Agent")]
graph = build_property_graph(agents, [], query="Test query")
# Run
runner = MACPRunner(llm_caller=mock_llm)
result = runner.run_round(graph)
# Assertions
assert result.final_answer == "Mocked response"
assert len(result.execution_order) == 1
assert result.total_tokens >= 0
mock_llm.assert_called_once()
def test_error_handling():
# Mock the LLM with an error
mock_llm = Mock(side_effect=Exception("LLM error"))
graph = build_property_graph([agent], [], query="Test")
config = RunnerConfig(
max_retries=2,
error_policy=ErrorPolicy(on_error=ErrorAction.SKIP),
)
runner = MACPRunner(llm_caller=mock_llm, config=config)
result = runner.run_round(graph)
assert len(result.errors) > 0
assert result.final_answer is None
def test_parallel_execution():
agents = [
AgentProfile(agent_id=f"agent_{i}", display_name=f"Agent {i}")
for i in range(3)
]
edges = [("agent_0", "agent_1"), ("agent_0", "agent_2")]
graph = build_property_graph(agents, edges, query="Test")
config = RunnerConfig(enable_parallel=True, max_parallel_size=2)
runner = MACPRunner(llm_caller=mock_llm, config=config)
result = runner.run_round(graph)
assert len(result.execution_order) == 3
```
### How do I scale to large graphs?
```python
# 1. Use pruning to cut inefficient paths
config = RunnerConfig(
pruning_config=PruningConfig(
min_weight_threshold=0.2,
min_probability_threshold=0.1,
token_budget=5000,
),
)
# 2. Use parallel execution
config.enable_parallel = True
config.max_parallel_size = 10
# 3. Use beam search to cap paths
config.routing_policy = RoutingPolicy.BEAM_SEARCH
scheduler = AdaptiveScheduler(policy=RoutingPolicy.BEAM_SEARCH, beam_width=5)
# 4. Use subgraph filtering
from core.algorithms import GraphAlgorithms, SubgraphFilter
algo = GraphAlgorithms(graph)
subgraph = algo.filter_subgraph(SubgraphFilter(
max_hop_distance=3,
from_node="start",
min_edge_weight=0.3,
))
# 5. Use async for parallel requests
async def process_large_graph(graph):
results = await runner.arun_round(graph)
return results
```
---
## License
---
## Support
- GitHub Issues: [github.com/yourusername/rustworkx-agent-framework/issues](https://github.com/yourusername/rustworkx-agent-framework/issues)
- Documentation: [github.com/yourusername/rustworkx-agent-framework#readme](https://github.com/yourusername/rustworkx-agent-framework#DOCUMENTATION)
---
<p align="center">
Made with β€οΈ for the multi-agent systems developer community
</p>
|