Spaces:
Running
Running
File size: 141,231 Bytes
4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557 2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745 2746 2747 2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811 2812 2813 2814 2815 2816 2817 2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839 2840 2841 2842 2843 2844 2845 2846 | """
Stream chat service implemented with Agno SDK (Agent + tools + DB).
"""
from __future__ import annotations
import ast
import asyncio
import json
import os
import re
import time
from collections.abc import AsyncGenerator
from datetime import datetime
from pathlib import Path
from typing import Any
from zoneinfo import ZoneInfo
from agno.agent import Agent, RunEvent
from agno.models.message import Message
from agno.run.agent import RunOutput, ToolCallCompletedEvent, ToolCallStartedEvent
from agno.run.team import TeamRunEvent
from agno.utils.log import logger
from ..models.stream_chat import (
AgentStatusEvent,
DoneEvent,
ErrorEvent,
FormRequestEvent, # New: HITL form request event
SourceEvent,
StreamChatRequest,
TextEvent,
ThoughtEvent,
ToolCallEvent,
ToolResultEvent,
)
from .agent_registry import get_agent_for_provider, build_team, resolve_agent_config
from .hitl_storage import get_hitl_storage
from .summary_service import update_session_summary
from .tool_registry import resolve_tool_name
MEMORY_OPTIMIZE_THRESHOLD = 50
MEMORY_OPTIMIZE_INTERVAL_SECONDS = 60 * 60 * 12
THINK_TAG_REGEX = re.compile(r"</?(?:think|thought)>", re.IGNORECASE)
PROTOCOL_TAG_REGEX = re.compile(
r"(?:<\s*[||]\s*(?P<tag>[a-zA-Z0-9_]+)\s*[||]\s*>)"
r"|(?:<\s*(?P<dsml_close>/?)\s*[||]\s*DSML\s*[||]\s*(?P<dsml_body>[^>]+)>)",
re.IGNORECASE,
)
TOOL_TRACE_BEGIN_TAGS = {
"tool_calls_begin",
"tool_calls_section_begin",
"tool_call_begin",
"tool_argument_begin",
"tool_call_argument_begin",
}
TOOL_TRACE_END_TAGS = {
"tool_argument_end",
"tool_call_argument_end",
"tool_call_end",
"tool_calls_end",
"tool_calls_section_end",
}
def _strip_internal_tool_trace(text: str) -> str:
"""Remove explicit protocol marker tokens without truncating normal text."""
if not text:
return ""
cleaned = str(text)
cleaned = re.sub(r"</?(?:think|thought)>", "", cleaned, flags=re.IGNORECASE)
# Remove protocol markers like <|tool_calls_begin|> and spaced variants.
cleaned = re.sub(r"<\s*[||]\s*[^|>||]*\s*[||]\s*>", "", cleaned, flags=re.IGNORECASE)
cleaned = re.sub(r"</?\s*[||]\s*DSML\s*[||]\s*[^>]*>", "", cleaned, flags=re.IGNORECASE)
cleaned = re.sub(r"</?(?:session_memory|today_local_time)>", "", cleaned, flags=re.IGNORECASE)
cleaned = re.sub(r"\[SYSTEM INJECTED CONTEXT\]", "", cleaned, flags=re.IGNORECASE)
# Remove inline tool-call-like snippets leaked by some models.
cleaned = re.sub(
r"([::]\s*)?[a-zA-Z_][a-zA-Z0-9_]{1,80}\s*\{[^{}\n]{0,1200}\}",
"",
cleaned,
flags=re.IGNORECASE,
)
return cleaned
def _split_content_by_think_tags(text: str, in_think: bool) -> tuple[list[tuple[str, str]], bool]:
"""Split a content chunk into ordered thought/text segments by <think>/<thought> tags."""
if not text:
return [], in_think
segments: list[tuple[str, str]] = []
cursor = 0
current_in_think = in_think
for match in THINK_TAG_REGEX.finditer(text):
start, end = match.span()
if start > cursor:
piece = text[cursor:start]
if piece:
segments.append(("thought" if current_in_think else "text", piece))
tag = match.group(0).lower()
current_in_think = not tag.startswith("</")
cursor = end
if cursor < len(text):
piece = text[cursor:]
if piece:
segments.append(("thought" if current_in_think else "text", piece))
return segments, current_in_think
def _strip_inline_tool_protocol(
text: str,
tool_trace_depth: int,
protocol_tail: str,
) -> tuple[str, int, str, bool]:
"""
Strip inline tool-protocol payloads from content chunks safely across chunk boundaries.
Returns:
cleaned_text, next_tool_trace_depth, next_protocol_tail, had_protocol_tokens
"""
combined = f"{protocol_tail}{text or ''}"
if not combined:
return "", tool_trace_depth, "", False
# Keep trailing incomplete protocol marker for next chunk.
tail = ""
tail_start = -1
for m in re.finditer(r"<\s*/?\s*[||]", combined):
tail_start = m.start()
if tail_start != -1 and ">" not in combined[tail_start:]:
tail = combined[tail_start:]
combined = combined[:tail_start]
if not combined:
return "", tool_trace_depth, tail, bool(tail)
# Non-destructive stripping: remove marker tokens only.
matches = list(PROTOCOL_TAG_REGEX.finditer(combined))
had_protocol = bool(matches)
if not had_protocol:
return combined, 0, tail, bool(tail)
cleaned = PROTOCOL_TAG_REGEX.sub("", combined)
return cleaned, 0, tail, True
def _squash_whitespace(text: Any) -> str:
return re.sub(r"\s+", "", str(text or ""))
def _extract_agent_info_from_event(
run_event: Any,
leader_id: str | None = None,
leader_name: str | None = None,
leader_emoji: str | None = None,
agent_metadata: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""
Extract agent identification from a run event.
For Team mode, member events have agent_id and agent_name directly on the event.
If these are set, it's a member event; otherwise it's from the leader.
Returns:
dict with 'agent_id', 'agent_name', 'agent_role', 'agent_emoji', 'model', 'provider' keys
"""
agent_id = getattr(run_event, "agent_id", None)
agent_name = getattr(run_event, "agent_name", None)
agent_emoji = getattr(run_event, "agent_emoji", None)
# Check if this matches the leader.
# Important: some providers might use slightly different names, but if IDs match it's definitely leader.
is_leader = False
if leader_id and agent_id == leader_id:
is_leader = True
elif not agent_id and not agent_name:
# Default to leader if no info is present
is_leader = True
elif not leader_id and agent_name == leader_name:
is_leader = True
# If the ID starts with 'qurio-' (default Agno IDs often follow this pattern)
# and we are in team mode, and it's not explicitly a member ID in our metadata,
# it's highly likely the leader's initialization event.
elif str(agent_id or "").startswith("qurio-") and agent_metadata:
is_leader = agent_id not in agent_metadata
if is_leader:
res = {
"agent_id": leader_id,
"agent_name": leader_name,
"agent_role": "leader",
"agent_emoji": leader_emoji,
"model": None,
"provider": None,
}
if agent_metadata and leader_id in agent_metadata:
res.update(agent_metadata[leader_id])
return res
# Otherwise treat as member
if agent_id or agent_name:
# Log at DEBUG level to reduce main stream noise, as switch logs will provide context.
logger.debug(f"[TEAM] Member event detected: agent_id={agent_id}, agent_name={agent_name}")
res = {
"agent_id": agent_id,
"agent_name": agent_name,
"agent_role": "member",
"agent_emoji": agent_emoji,
"model": None,
"provider": None,
}
# Enrich from metadata if possible
if agent_metadata:
if agent_id in agent_metadata:
res.update(agent_metadata[agent_id])
elif agent_name in agent_metadata:
res.update(agent_metadata[agent_name])
elif not agent_id and agent_name: # Fallback lookup by name if ID missing on event
for meta_id, meta in agent_metadata.items():
if meta.get("name") == agent_name:
res.update(meta)
res["agent_id"] = meta_id
break
return res
# Fallback to leader if totally ambiguous
res = {
"agent_id": leader_id,
"agent_name": leader_name,
"agent_role": "leader",
"agent_emoji": leader_emoji,
"model": None,
"provider": None,
}
if agent_metadata and leader_id in agent_metadata:
res.update(agent_metadata[leader_id])
return res
def _is_reasoning_duplicate_of_content(reasoning: str, content: str) -> bool:
"""
Detect provider chunks where answer text is mirrored in reasoning_content.
This prevents answer paragraphs from being rendered as a second thought block.
"""
reasoning_flat = _squash_whitespace(reasoning)
content_flat = _squash_whitespace(content)
if not reasoning_flat or not content_flat:
return False
if reasoning_flat == content_flat:
return True
shorter, longer = (
(reasoning_flat, content_flat)
if len(reasoning_flat) <= len(content_flat)
else (content_flat, reasoning_flat)
)
if len(shorter) < 12:
return False
if shorter in longer and len(shorter) >= int(len(longer) * 0.75):
return True
return False
def _is_stream_trace_enabled() -> bool:
value = str(os.getenv("QURIO_STREAM_TRACE", "")).strip().lower()
return value in {"1", "true", "yes", "on", "debug"}
def _is_verbose_logs_enabled() -> bool:
value = str(os.getenv("QURIO_VERBOSE_LOGS", "0")).strip().lower()
return value in {"1", "true", "yes", "on", "debug"}
def _log_verbose_info(message: str) -> None:
if _is_verbose_logs_enabled():
logger.info(message)
else:
logger.debug(message)
def _preview(text: Any, limit: int = 140) -> str:
raw = str(text or "").replace("\n", "\\n")
return raw[:limit] + ("..." if len(raw) > limit else "")
def _extract_message_from_payload(payload: Any) -> str | None:
if payload is None:
return None
if hasattr(payload, "model_dump"):
try:
payload = payload.model_dump()
except Exception:
payload = str(payload)
if isinstance(payload, dict):
for key in ("message", "error", "detail", "msg"):
value = payload.get(key)
if isinstance(value, str) and value.strip():
return value.strip()
nested = _extract_message_from_payload(value)
if nested:
return nested
for value in payload.values():
nested = _extract_message_from_payload(value)
if nested:
return nested
return None
if isinstance(payload, (list, tuple)):
for item in payload:
nested = _extract_message_from_payload(item)
if nested:
return nested
return None
text = str(payload).strip()
if not text:
return None
# Agno RunErrorEvent repr: RunErrorEvent(..., content='Unknown model error', ...)
run_error_content_match = re.search(
r"""content\s*=\s*(['"])(.*?)\1""",
text,
re.IGNORECASE | re.DOTALL,
)
if run_error_content_match and run_error_content_match.group(2).strip():
return run_error_content_match.group(2).strip()
for parser in (json.loads, ast.literal_eval):
try:
parsed = parser(text)
nested = _extract_message_from_payload(parsed)
if nested:
return nested
except Exception:
pass
json_like = re.search(r"(\{[\s\S]*\})", text)
if json_like:
snippet = json_like.group(1)
for parser in (json.loads, ast.literal_eval):
try:
parsed = parser(snippet)
nested = _extract_message_from_payload(parsed)
if nested:
return nested
except Exception:
pass
message_match = re.search(r"""['"]message['"]\s*:\s*['"](.+?)['"]""", text, re.IGNORECASE)
if message_match and message_match.group(1).strip():
return message_match.group(1).strip()
return text
def _extract_best_error_message(exc: Exception | Any) -> str:
"""Extract the most actionable provider message from nested exceptions."""
generic_markers = ("unknown model error", "unknown error", "model provider error")
def _is_generic(text: str) -> bool:
lowered = text.strip().lower()
return any(marker in lowered for marker in generic_markers)
candidates: list[str] = []
queue: list[Any] = [exc]
seen: set[int] = set()
while queue:
current = queue.pop(0)
if current is None:
continue
marker = id(current)
if marker in seen:
continue
seen.add(marker)
extracted = _extract_message_from_payload(current)
if extracted and extracted.strip():
candidates.append(extracted.strip())
if isinstance(current, BaseException):
queue.append(getattr(current, "__cause__", None))
queue.append(getattr(current, "__context__", None))
args = getattr(current, "args", None)
if isinstance(args, tuple):
queue.extend(args)
for attr in ("content", "error", "message", "detail", "model_provider_data"):
if hasattr(current, attr):
queue.append(getattr(current, attr, None))
for msg in candidates:
if not _is_generic(msg):
return msg
if candidates:
# Avoid dumping full event repr like "RunErrorEvent(...)" to UI.
filtered = [msg for msg in candidates if not msg.strip().lower().startswith("runerrorevent(")]
if filtered:
return min(filtered, key=len)
return min(candidates, key=len)
return str(exc or "Unknown error")
def _extract_text_chunk(run_event: Any) -> str:
"""Extract assistant text only from explicit content fields.
Shared between stream_chat() and _continue_hitl_run() to avoid duplication.
"""
provider_data = getattr(run_event, "model_provider_data", None)
if isinstance(provider_data, dict):
choices = provider_data.get("choices") or []
if choices and isinstance(choices[0], dict):
delta = choices[0].get("delta") or {}
raw_content = delta.get("content")
if isinstance(raw_content, str) and raw_content:
return raw_content
if isinstance(raw_content, list):
parts: list[str] = []
for item in raw_content:
if isinstance(item, dict):
text_part = item.get("text") or item.get("content")
if text_part:
parts.append(str(text_part))
elif isinstance(item, str) and item:
parts.append(item)
if parts:
return "".join(parts)
raw_reasoning = delta.get("reasoning_content")
if isinstance(raw_reasoning, str) and raw_reasoning:
return ""
if isinstance(raw_reasoning, list):
reasoning_parts: list[str] = []
for item in raw_reasoning:
if isinstance(item, dict):
text_part = item.get("text") or item.get("content")
if text_part:
reasoning_parts.append(str(text_part))
elif isinstance(item, str) and item:
reasoning_parts.append(item)
if reasoning_parts:
return ""
content = getattr(run_event, "content", None)
if isinstance(content, str) and content:
return content
if isinstance(content, list):
parts: list[str] = []
for item in content:
if isinstance(item, dict):
text_part = item.get("text") or item.get("content")
if isinstance(text_part, str) and text_part:
parts.append(text_part)
elif isinstance(item, str) and item:
parts.append(item)
if parts:
return "".join(parts)
if isinstance(provider_data, dict):
choices = provider_data.get("choices") or []
if choices and isinstance(choices[0], dict):
delta = choices[0].get("delta") or {}
raw_content = delta.get("content")
if isinstance(raw_content, str) and raw_content:
return raw_content
if isinstance(raw_content, list):
parts: list[str] = []
for item in raw_content:
if isinstance(item, dict):
text_part = item.get("text") or item.get("content")
if text_part:
parts.append(str(text_part))
elif isinstance(item, str) and item:
parts.append(item)
if parts:
return "".join(parts)
return ""
def _extract_reasoning_chunk(
run_event: Any,
trace_fn: Any = None,
) -> str:
"""Extract reasoning/thought content from a stream event.
Shared between stream_chat() and _continue_hitl_run() to avoid duplication.
``trace_fn`` is an optional callable(stage, **kwargs) for trace logging.
"""
def _trace(stage: str, **kwargs: Any) -> None:
if trace_fn:
trace_fn(stage, **kwargs)
provider_data = getattr(run_event, "model_provider_data", None)
if isinstance(provider_data, dict):
choices = provider_data.get("choices") or []
if choices and isinstance(choices[0], dict):
delta = choices[0].get("delta") or {}
raw_reasoning = delta.get("reasoning_content")
if isinstance(raw_reasoning, str) and raw_reasoning:
_trace("reasoning_source", source="provider_data.delta.reasoning_content")
return raw_reasoning
if isinstance(raw_reasoning, list):
parts: list[str] = []
for item in raw_reasoning:
if isinstance(item, dict):
text_part = item.get("text") or item.get("content")
if text_part:
parts.append(str(text_part))
elif isinstance(item, str) and item:
parts.append(item)
if parts:
_trace("reasoning_source", source="provider_data.delta.reasoning_content[]")
return "".join(parts)
reasoning = getattr(run_event, "reasoning_content", None)
if isinstance(reasoning, str) and reasoning:
_trace("reasoning_source", source="run_event.reasoning_content")
return reasoning
if isinstance(reasoning, list):
parts: list[str] = []
for item in reasoning:
if isinstance(item, dict):
text_part = item.get("text") or item.get("content")
if text_part:
parts.append(str(text_part))
elif isinstance(item, str) and item:
parts.append(item)
if parts:
_trace("reasoning_source", source="run_event.reasoning_content[]")
return "".join(parts)
if isinstance(provider_data, dict):
choices = provider_data.get("choices") or []
if choices and isinstance(choices[0], dict):
delta = choices[0].get("delta") or {}
raw_reasoning = delta.get("reasoning_content")
if isinstance(raw_reasoning, str) and raw_reasoning:
_trace("reasoning_source", source="provider_data.delta.reasoning_content")
return raw_reasoning
if isinstance(raw_reasoning, list):
parts: list[str] = []
for item in raw_reasoning:
if isinstance(item, dict):
text_part = item.get("text") or item.get("content")
if text_part:
parts.append(str(text_part))
if parts:
_trace("reasoning_source", source="provider_data.delta.reasoning_content[]")
return "".join(parts)
# NOTE:
# Some providers (e.g. DeepSeek-compatible streams) may place
# assistant answer tokens under `delta.reasoning`.
# Treating that field as reasoning can misclassify answer text as thought.
# Keep reasoning extraction strict to `reasoning_content` only.
return ""
def _is_raw_events_log_enabled() -> bool:
value = str(os.getenv("QURIO_RAW_EVENTS_LOG", "0")).strip().lower()
return value not in {"0", "false", "off", "no"}
def _raw_events_log_path() -> Path:
configured = str(os.getenv("QURIO_RAW_EVENTS_LOG_PATH", "")).strip()
if configured:
return Path(configured)
logs_dir = Path(__file__).resolve().parents[2] / "logs"
logs_dir.mkdir(parents=True, exist_ok=True)
date_tag = datetime.utcnow().strftime("%Y%m%d")
return logs_dir / f"agno_raw_events_{date_tag}.jsonl"
def _append_raw_event_log(
*,
phase: str,
request: StreamChatRequest,
run_id: str | None,
run_event: Any,
) -> None:
if not _is_raw_events_log_enabled():
return
try:
event_name = str(getattr(run_event, "event", "") or "")
content_chunk = _extract_text_chunk(run_event)
reasoning_chunk = _extract_reasoning_chunk(run_event)
payload = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"phase": phase,
"provider": request.provider,
"model": request.model,
"conversation_id": request.conversation_id,
"run_id": run_id or getattr(run_event, "run_id", None),
"event_name": event_name,
"raw_event_type": type(run_event).__name__,
"has_content": bool(str(content_chunk or "").strip()),
"has_reasoning_content": bool(str(reasoning_chunk or "").strip()),
"content_preview": _preview(content_chunk),
"reasoning_preview": _preview(reasoning_chunk),
"raw_event": repr(run_event),
}
log_path = _raw_events_log_path()
with log_path.open("a", encoding="utf-8") as f:
f.write(json.dumps(payload, ensure_ascii=False) + "\n")
except Exception:
# Never break stream flow due to diagnostics logging failures.
return
def _extract_completed_content_and_output(
run_event: Any,
streamed_content: str = "",
) -> tuple[str, Any]:
"""
Extract final assistant content/output from RunCompleted-style events.
Shared between normal stream_chat() and HITL continuation to keep behavior aligned.
"""
agn_content = getattr(run_event, "content", None)
run_response = getattr(run_event, "run_response", None)
if not agn_content and run_response is not None:
agn_content = getattr(run_response, "content", None)
final_content = streamed_content or ""
output = None
# Structured output should override streamed text to preserve canonical payload.
if agn_content and hasattr(agn_content, "model_dump"):
output = agn_content
final_content = json.dumps(agn_content.model_dump(), ensure_ascii=False)
elif isinstance(agn_content, (dict, list)):
output = agn_content
final_content = json.dumps(agn_content, ensure_ascii=False)
elif isinstance(agn_content, str) and agn_content.strip() and not final_content:
final_content = agn_content
# Fallback for providers that only keep final assistant text in run_response.messages.
if not final_content and run_response is not None:
try:
rr_messages = getattr(run_response, "messages", None) or []
for rr_msg in reversed(rr_messages):
rr_role = getattr(rr_msg, "role", None)
rr_content = getattr(rr_msg, "content", None)
if rr_role != "assistant":
continue
extracted = _extract_text_from_message_content(rr_content).strip()
if extracted:
final_content = extracted
break
except Exception:
pass
return final_content, output
def _extract_text_from_message_content(content: Any) -> str:
"""
Best-effort text extraction for provider-specific assistant message payloads.
"""
if isinstance(content, str):
return content
if isinstance(content, list):
parts: list[str] = []
for item in content:
if isinstance(item, str):
parts.append(item)
continue
if isinstance(item, dict):
text_part = item.get("text")
if isinstance(text_part, str):
parts.append(text_part)
continue
content_part = item.get("content")
if isinstance(content_part, str):
parts.append(content_part)
continue
if isinstance(content_part, (list, dict)):
nested = _extract_text_from_message_content(content_part)
if nested:
parts.append(nested)
parts_part = item.get("parts")
if isinstance(parts_part, (list, dict)):
nested = _extract_text_from_message_content(parts_part)
if nested:
parts.append(nested)
return "".join(parts)
if isinstance(content, dict):
text_part = content.get("text")
if isinstance(text_part, str):
return text_part
content_part = content.get("content")
if isinstance(content_part, str):
return content_part
if isinstance(content_part, (list, dict)):
nested = _extract_text_from_message_content(content_part)
if nested:
return nested
parts_part = content.get("parts")
if isinstance(parts_part, (list, dict)):
nested = _extract_text_from_message_content(parts_part)
if nested:
return nested
return ""
def _coerce_tool_result_payload(output: Any) -> Any:
"""
Normalize tool output payload into JSON-friendly objects when possible.
"""
normalized = output
if normalized and isinstance(normalized, str):
try:
normalized = json.loads(normalized)
except json.JSONDecodeError:
pass
if isinstance(normalized, str):
try:
parsed = ast.literal_eval(normalized)
if isinstance(parsed, dict):
normalized = parsed
except (ValueError, SyntaxError):
pass
return normalized
def _build_tool_result_event(
tool: Any,
duration_ms: int | None,
normalize_tool_output_fn: Any,
agent_info: dict[str, str | None] | None = None,
) -> tuple[dict[str, Any], Any]:
"""
Build frontend ToolResultEvent payload and return parsed tool output.
"""
output = _coerce_tool_result_payload(normalize_tool_output_fn(getattr(tool, "result", None)))
event = ToolResultEvent(
id=getattr(tool, "tool_call_id", None),
name=getattr(tool, "tool_name", "") or "",
status="done" if not getattr(tool, "tool_call_error", None) else "error",
output=output,
durationMs=duration_ms,
agent_id=agent_info.get("agent_id") if agent_info else None,
agent_name=agent_info.get("agent_name") if agent_info else None,
agent_role=agent_info.get("agent_role") if agent_info else None,
agent_emoji=agent_info.get("agent_emoji") if agent_info else None,
).model_dump(by_alias=True, exclude_none=True)
return event, output
def _build_tool_call_event(
tool: Any,
text_index: int,
include_none: bool = False,
agent_info: dict[str, str | None] | None = None,
) -> dict[str, Any]:
payload = ToolCallEvent(
id=getattr(tool, "tool_call_id", None),
name=getattr(tool, "tool_name", "") or "",
arguments=json.dumps(getattr(tool, "tool_args", None) or {}),
text_index=text_index,
agent_id=agent_info.get("agent_id") if agent_info else None,
agent_name=agent_info.get("agent_name") if agent_info else None,
agent_role=agent_info.get("agent_role") if agent_info else None,
agent_emoji=agent_info.get("agent_emoji") if agent_info else None,
)
if include_none:
return payload.model_dump(by_alias=True, exclude_none=False)
return payload.model_dump(by_alias=True, exclude_none=True)
def _normalize_interactive_form_fields(raw_fields: Any) -> list[dict[str, Any]]:
"""
Normalize interactive_form fields to a strict list[dict].
Some providers/tool runtimes return fields as a JSON string; this helper
parses and sanitizes that shape so FormRequestEvent validation won't fail.
"""
parsed = raw_fields
if isinstance(parsed, str):
text = parsed.strip()
if not text:
return []
try:
parsed = json.loads(text)
except Exception:
try:
parsed = ast.literal_eval(text)
except Exception:
logger.warning("interactive_form fields is invalid string, fallback to empty list")
return []
if isinstance(parsed, dict):
maybe_fields = parsed.get("fields")
if isinstance(maybe_fields, list):
parsed = maybe_fields
else:
return []
if not isinstance(parsed, list):
return []
normalized: list[dict[str, Any]] = []
used_names: set[str] = set()
def _slugify_name(value: Any, fallback_index: int) -> str:
base = re.sub(r"[^a-zA-Z0-9_]+", "_", str(value or "").strip().lower()).strip("_")
if not base:
base = f"field_{fallback_index}"
candidate = base
suffix = 2
while candidate in used_names:
candidate = f"{base}_{suffix}"
suffix += 1
used_names.add(candidate)
return candidate
def _normalize_field_type(value: Any) -> str:
candidate = str(value or "").strip().lower()
if candidate in {"text", "number", "select", "checkbox", "range"}:
return candidate
return "text"
for idx, item in enumerate(parsed, start=1):
if isinstance(item, str):
label = item.strip()
if not label:
continue
normalized.append(
{
"name": _slugify_name(label, idx),
"label": label,
"type": "text",
"required": False,
}
)
continue
if not isinstance(item, dict):
logger.warning("interactive_form field item is not dict, skipped: %s", type(item).__name__)
continue
raw_name = item.get("name")
raw_label = item.get("label")
label = str(raw_label or raw_name or f"Field {idx}").strip() or f"Field {idx}"
field_name = _slugify_name(raw_name or label, idx)
field_type = _normalize_field_type(item.get("type"))
normalized_item = dict(item)
normalized_item["name"] = field_name
normalized_item["label"] = label
normalized_item["type"] = field_type
normalized_item["required"] = bool(item.get("required", False))
normalized.append(normalized_item)
return normalized
def _extract_interactive_form_payload(req: Any, default_title: str) -> tuple[str | None, str, list[dict[str, Any]]]:
"""Extract interactive form payload from requirement in a validation-safe way."""
tool_args = req.tool_execution.tool_args if getattr(req, "tool_execution", None) else {}
if not isinstance(tool_args, dict):
tool_args = {}
form_id = tool_args.get("id")
title = str(tool_args.get("title") or default_title)
fields = _normalize_interactive_form_fields(tool_args.get("fields", []))
return form_id, title, fields
def _is_interactive_form_requirement(req: Any) -> bool:
tool_exec = getattr(req, "tool_execution", None)
tool_name = getattr(tool_exec, "tool_name", None) if tool_exec else None
return tool_name == "interactive_form"
class StreamChatService:
"""Stream chat service implemented using Agno Agent streaming events."""
def __init__(self) -> None:
self._last_memory_optimization: dict[str, float] = {}
async def stream_chat(
self,
request: StreamChatRequest,
) -> AsyncGenerator[dict[str, Any], None]:
"""
Stream chat completion with HITL support.
If request.run_id is present, this is a resumption request after form submission.
Otherwise, this is a normal chat request.
"""
# ================================================================
# HITL: Check if this is a resumption request
# ================================================================
if request.run_id and request.field_values:
_log_verbose_info(f"Detected HITL resumption request (run_id: {request.run_id})")
async for event in self._continue_hitl_run(request):
yield event
return
# Debug: Log request fields for expert mode
if request.expert_mode:
logger.info(f"[DEBUG] Expert mode request - leader_agent_id: {getattr(request, 'leader_agent_id', 'NOT_FOUND')}, team_agent_ids: {getattr(request, 'team_agent_ids', [])}")
# ================================================================
# Normal chat flow
# ================================================================
try:
if not request.provider:
raise ValueError("Missing required field: provider")
if not request.messages:
raise ValueError("Missing required field: messages")
# Enable skills for the definitive user-facing chat agent
request.enable_skills = True
# Build standard agent or Team
agent_metadata: dict[str, Any] = {}
if request.expert_mode and getattr(request, "team_agent_ids", []):
# Log leader configuration for debugging
logger.info(f"[TEAM] Building team - leader_agent_id: {getattr(request, 'leader_agent_id', None)}, team_agent_ids: {request.team_agent_ids}")
# 1. Resolve Leader Configuration if ID is provided
if getattr(request, "leader_agent_id", None):
request = resolve_agent_config(request.leader_agent_id, request)
logger.info(f"[TEAM] Leader resolved - agent_id: {getattr(request, 'agent_id', None)}, agent_name: {getattr(request, 'agent_name', None)}")
if request.agent_id:
agent_metadata[request.agent_id] = {
"model": request.model,
"provider": request.provider,
}
# 2. Resolve Member Agents
members = []
for a_id in request.team_agent_ids:
import copy
sub_req = copy.deepcopy(request)
sub_req.expert_mode = False
# Fetch actual member config from DB
member_req = resolve_agent_config(a_id, sub_req)
members.append(get_agent_for_provider(member_req))
# Capture member metadata
if member_req.agent_id:
agent_metadata[member_req.agent_id] = {
"model": member_req.model,
"provider": member_req.provider,
}
if member_req.agent_name:
agent_metadata[member_req.agent_name] = {
"model": member_req.model,
"provider": member_req.provider,
}
# 3. Build the Team with resolved leader (request) and members
agent = build_team(request, members)
is_team_mode = True
else:
agent = get_agent_for_provider(request)
is_team_mode = False
sources_map: dict[str, Any] = {}
full_content = ""
full_thought = ""
tool_start_times: dict[str, float] = {}
should_break_next_thought = False
in_reasoning_phase = False
reasoning_closed_for_current_cycle = False
in_content_think_block = False
inline_tool_trace_depth = 0
inline_protocol_tail = ""
stream_trace = _is_stream_trace_enabled()
# Current agent info for Team mode (updated per event)
current_agent_info: dict[str, Any] = {"agent_id": None, "agent_name": None}
last_active_agent_id = None
def trace_stream(stage: str, **kwargs: Any) -> None:
if not stream_trace:
return
payload = ", ".join([f"{k}={v}" for k, v in kwargs.items()])
logger.info(f"[STREAM_TRACE][main] {stage} | {payload}")
def emit_thought_part(part: str):
nonlocal full_thought, full_content, should_break_next_thought, in_reasoning_phase, reasoning_closed_for_current_cycle
text = _strip_internal_tool_trace(str(part or ""))
if not text or not text.strip():
return
should_break_next_thought = False
in_reasoning_phase = True
full_thought += text
trace_stream("emit_reasoning", reasoning_preview=_preview(text))
current_text_index = len(full_content)
yield ThoughtEvent(
content=text,
text_index=current_text_index,
agent_id=current_agent_info.get("agent_id"),
agent_name=current_agent_info.get("agent_name"),
agent_status=current_agent_info.get("status"),
).model_dump(by_alias=True, exclude_none=True)
def process_text(text: str):
nonlocal full_content, in_reasoning_phase, should_break_next_thought, reasoning_closed_for_current_cycle
clean_text = _strip_internal_tool_trace(text)
if clean_text:
has_visible_text = bool(clean_text.strip())
if has_visible_text:
in_reasoning_phase = False
should_break_next_thought = True
reasoning_closed_for_current_cycle = True
full_content += clean_text
yield TextEvent(
content=clean_text,
agent_id=current_agent_info.get("agent_id"),
agent_name=current_agent_info.get("agent_name"),
agent_status=current_agent_info.get("status"),
).model_dump(by_alias=True, exclude_none=True)
# Agent status tracking for Team mode
agent_statuses: dict[str, str] = {}
def set_agent_status(agent_id: str | None, status: str):
if not agent_id: return
if agent_statuses.get(agent_id) == status: return
agent_statuses[agent_id] = status
return AgentStatusEvent(agentId=agent_id, status=status).model_dump(by_alias=True)
async def update_status_and_yield(agent_id: str | None, status: str):
event = set_agent_status(agent_id, status)
if event:
yield event
# Context management now handled by Agno's num_history_runs parameter
messages = request.messages
pre_events: list[dict[str, Any]] = []
messages = self._inject_local_time_context(messages, request, pre_events)
enabled_tool_names = self._collect_enabled_tool_names(request)
messages = self._inject_tool_guidance(messages, enabled_tool_names, request)
for event in pre_events:
yield event
# ================================================================
# MANUAL CONTEXT MANAGEMENT (Rolling Summary + Fixed Window)
# ================================================================
# 1. Fetch Session Summary from DB
session_summary_text = None
old_summary_json = None
if request.conversation_id:
try:
from ..models.db import DbFilter, DbQueryRequest
from .db_service import execute_db_async, get_db_adapter
adapter = get_db_adapter(request.database_provider)
if adapter:
req = DbQueryRequest(
providerId=adapter.config.id,
action="select",
table="conversations",
columns=["session_summary"],
filters=[DbFilter(op="eq", column="id", value=request.conversation_id)],
maybeSingle=True,
)
result = await execute_db_async(adapter, req)
if result.data and isinstance(result.data, dict):
row = result.data
raw_summary = row.get("session_summary")
if raw_summary:
# Parsing handled by adapter often, but double check
if isinstance(raw_summary, str):
try:
old_summary_json = json.loads(raw_summary)
except (ValueError, json.JSONDecodeError):
pass
elif isinstance(raw_summary, dict):
old_summary_json = raw_summary
if old_summary_json:
session_summary_text = old_summary_json.get("summary")
except Exception as e:
logger.warning(f"Failed to fetch session summary: {e}")
logger.error(f"Failed to fetch session summary: {e}")
# 2. Slice History (Turn-Based Window)
# Strategy: Keep all System messages + Last N User turns (User + AI + Tools)
# N comes from frontend context setting: contextTurns.
raw_turn_limit = request.context_turn_limit
turn_limit = (
max(1, min(50, int(raw_turn_limit)))
if isinstance(raw_turn_limit, int) and raw_turn_limit > 0
else 2
)
# Separate System and Non-System
system_messages = [m for m in messages if m.get("role") == "system"]
chat_messages = [m for m in messages if m.get("role") != "system"]
# Find the indices of User messages to determine run boundaries
user_indices = [i for i, m in enumerate(chat_messages) if m.get("role") == "user"]
user_turn_count = len(user_indices)
if user_turn_count > turn_limit:
cutoff_index = user_indices[-turn_limit]
recent_history = chat_messages[cutoff_index:]
else:
recent_history = chat_messages
# For single-turn requests (common during first-turn regenerate),
# using persisted summary can re-introduce stale assistant text.
# In this case, use fresh request messages only and rebuild summary from this turn.
is_single_user_turn = user_turn_count <= 1
# Force rebuild if it's the first turn OR if the user is editing/regenerating
should_rebuild_summary = bool(is_single_user_turn or request.is_editing)
# Inject summary only when history exceeds turn window and request is not rebuild flow.
should_inject_summary = bool(session_summary_text) and (user_turn_count > turn_limit) and (not should_rebuild_summary)
if not should_inject_summary and session_summary_text:
_log_verbose_info(
"Skipping session summary injection (within turn window or single-turn rebuild context)."
)
session_summary_text = None
old_summary_json = None
# 3. Inject Summary into System Prompt
if session_summary_text:
summary_prompt = (
"\n\nSession memory summary:\n"
"Here is a summary of the conversation so far. Use this to understand long-term context, "
"but prioritize the details in the recent messages below.\n"
f"{session_summary_text}\n"
)
# Inject into the LAST system message, or create a new one if none exist
if system_messages:
last_sys = system_messages[-1]
# Avoid appending if already present (defensive)
if "Session memory summary:" not in str(last_sys.get("content", "")):
new_content = str(last_sys.get("content", "")) + summary_prompt
# Update the dict (need to be careful not to mutate original request list in place if reused, but here it's fine)
last_sys["content"] = new_content
else:
system_messages.append({"role": "system", "content": summary_prompt})
# Final Agent Input
agent_input = system_messages + recent_history
stream = agent.arun(
input=agent_input,
stream=True,
stream_events=True,
user_id=request.user_id,
session_id=request.conversation_id,
# Only pass explicit structured-output schema.
# Do not fallback to response_format, otherwise {"type":"json_object"}
# may be treated as grammar and trigger provider-side grammar cache errors.
output_schema=request.output_schema,
)
# ================================================================
# Stream processing with HITL support
# ================================================================
async for run_event in stream:
_append_raw_event_log(
phase="main",
request=request,
run_id=getattr(run_event, "run_id", None),
run_event=run_event,
)
# ============================================================
# HITL: Check if agent paused for user input
# ============================================================
if hasattr(run_event, 'is_paused') and run_event.is_paused:
logger.info(f"Agent paused for HITL (run_id: {run_event.run_id})")
# Extract requirements
requirements = getattr(run_event, 'active_requirements', None) or getattr(run_event, 'requirements', None)
if requirements:
form_requirements = [req for req in requirements if _is_interactive_form_requirement(req)]
if not form_requirements:
logger.info("Agent paused without interactive_form; skipping HITL form handling")
yield DoneEvent(
content=full_content or "",
thought=full_thought.strip() or None,
sources=list(sources_map.values()) or None,
).model_dump()
return
# Save to Supabase
try:
paused_tools = getattr(run_event, "tools", None) or []
serialized_tools = [
tool.to_dict() if hasattr(tool, "to_dict") else tool
for tool in paused_tools
if tool is not None
]
serialized_requirements = [
req.to_dict() if hasattr(req, "to_dict") else req
for req in form_requirements
]
paused_run_output = {
"run_id": getattr(run_event, "run_id", None),
"session_id": getattr(run_event, "session_id", None)
or request.conversation_id,
"user_id": request.user_id,
"messages": messages or [],
"tools": serialized_tools,
"requirements": serialized_requirements,
"status": "PAUSED",
}
logger.info(
f"[HITL] Saving pending run_id={run_event.run_id} "
f"with database_provider={request.database_provider}"
)
hitl_storage = get_hitl_storage(request.database_provider)
saved = await hitl_storage.save_pending_run(
run_id=run_event.run_id,
requirements=form_requirements,
conversation_id=request.conversation_id,
user_id=request.user_id,
agent_model=request.model,
messages=messages,
run_output=paused_run_output,
)
if not saved:
raise RuntimeError("Failed to persist HITL pending run")
# Extract form fields for frontend
for req in form_requirements:
# Handle external execution (e.g., interactive_form with external_execution=True)
if (hasattr(req, 'needs_external_execution') and req.needs_external_execution) or \
(req.tool_execution and req.tool_execution.tool_name == "interactive_form"):
form_id, title, fields = _extract_interactive_form_payload(
req,
default_title="Please provide the following information",
)
# Send form_request event to frontend
yield FormRequestEvent(
run_id=run_event.run_id,
form_id=form_id,
title=title,
fields=fields
).model_dump()
# Fallback handle traditional user input (e.g., get_user_input)
elif req.needs_user_input and req.user_input_schema:
# Convert from user_input_schema
form_id = None
title = "Please provide the following information"
fields = [
{
"name": field.name,
"type": self._map_field_type_to_frontend(field.field_type),
"label": field.description or field.name,
"required": True,
"value": field.value
}
for field in req.user_input_schema
]
# Send form_request event to frontend
yield FormRequestEvent(
run_id=run_event.run_id,
form_id=form_id,
title=title,
fields=fields
).model_dump()
# Send done event to indicate pause
yield DoneEvent(
content=full_content or "",
thought=full_thought.strip() or None,
sources=list(sources_map.values()) or None,
).model_dump()
_log_verbose_info(f"HITL pause successful, waiting for user submission (run_id: {run_event.run_id})")
return # Exit stream, wait for user to submit form
except Exception as e:
logger.error(f"Failed to save HITL state: {e}")
yield ErrorEvent(error=f"Failed to pause for form: {str(e)}").model_dump()
return
else:
logger.warning("Agent paused but no requirements found")
yield DoneEvent(
content=full_content or "",
thought=full_thought.strip() or None,
sources=list(sources_map.values()) or None,
).model_dump()
return
# ============================================================
# Normal streaming events (use stream_events for details)
# ============================================================
# Check if this is a detailed event (from stream_events=True)
if hasattr(run_event, 'event'):
# Extract agent info for Team mode (member vs leader identification)
current_agent_info = _extract_agent_info_from_event(
run_event,
leader_id=request.agent_id,
leader_name=request.agent_name,
leader_emoji=request.agent_emoji,
agent_metadata=agent_metadata,
)
# Log active agent switch in Team mode
if is_team_mode:
current_id = current_agent_info.get("agent_id")
if current_id != last_active_agent_id:
last_active_agent_id = current_id
active_name = current_agent_info.get("agent_name")
active_role = current_agent_info.get("agent_role")
active_model = current_agent_info.get("model")
active_provider = current_agent_info.get("provider")
logger.info(
f"[TEAM] >>> Active Agent Switch: {active_name} ({active_role}) "
f"| Model: {active_model} | Provider: {active_provider}"
)
# Apply current tracked status to info for text/thought events
current_agent_info["status"] = agent_statuses.get(current_id, "active")
if current_agent_info.get("agent_role") == "member":
trace_stream(
"member_event",
agent_id=current_agent_info.get("agent_id"),
agent_name=current_agent_info.get("agent_name"),
)
match run_event.event:
case RunEvent.run_started.value | TeamRunEvent.run_started:
if is_team_mode:
active_id = current_agent_info.get("agent_id")
active_name = current_agent_info.get("agent_name")
active_role = current_agent_info.get("agent_role")
active_model = current_agent_info.get("model")
active_provider = current_agent_info.get("provider")
logger.info(
f"[TEAM] >>> run_started: {active_name} ({active_role}) "
f"| Model: {active_model} | Provider: {active_provider}"
)
# Member starts -> Leader waits, Member active
if active_role == "member":
# Ensure leader is set to waiting when member starts
async for e in update_status_and_yield(request.agent_id, "waiting"):
yield e
async for e in update_status_and_yield(active_id, "active"):
yield e
else:
# Leader starts -> Leader active
async for e in update_status_and_yield(active_id, "active"):
yield e
continue
case TeamRunEvent.run_completed:
if is_team_mode:
active_id = current_agent_info.get("agent_id")
active_role = current_agent_info.get("agent_role")
if active_role == "member":
# Member finished -> Leader still waiting (until it resumes), Member ready
async for e in update_status_and_yield(active_id, "ready"):
yield e
continue
# Handle both Agent RunEvent and Team TeamRunEvent for content streaming
case RunEvent.run_content.value | TeamRunEvent.run_content:
raw_content_chunk = _extract_text_chunk(run_event)
raw_content_chunk, inline_tool_trace_depth, inline_protocol_tail, had_protocol = _strip_inline_tool_protocol(
raw_content_chunk,
inline_tool_trace_depth,
inline_protocol_tail,
)
if had_protocol:
trace_stream(
"strip_tool_protocol",
depth=inline_tool_trace_depth,
tail_len=len(inline_protocol_tail),
cleaned_preview=_preview(raw_content_chunk),
)
raw_reasoning = _extract_reasoning_chunk(run_event, trace_fn=trace_stream)
content_segments, in_content_think_block = _split_content_by_think_tags(
raw_content_chunk,
in_content_think_block,
)
content_chunk = "".join(
seg_text for seg_type, seg_text in content_segments if seg_type == "text"
)
inline_thought_chunk = "".join(
seg_text for seg_type, seg_text in content_segments if seg_type == "thought"
)
reasoning = raw_reasoning
if reasoning and content_chunk and _is_reasoning_duplicate_of_content(
str(reasoning),
str(content_chunk),
):
trace_stream(
"suppress_reasoning_overlap",
event="run_content",
reasoning_preview=_preview(reasoning),
content_preview=_preview(content_chunk),
)
reasoning = ""
has_content_chunk = bool(content_chunk)
has_inline_thought = bool(inline_thought_chunk)
trace_stream(
"run_content",
has_content=has_content_chunk,
has_reasoning=bool(reasoning) or has_inline_thought,
reasoning_closed=reasoning_closed_for_current_cycle,
content_preview=_preview(content_chunk),
reasoning_preview=_preview((reasoning or "") + (inline_thought_chunk or "")),
)
has_any_thought = bool(reasoning) or has_inline_thought
if has_any_thought and reasoning_closed_for_current_cycle:
# Re-open reasoning phase (e.g. after tool call or interleaved model output).
reasoning_closed_for_current_cycle = False
in_reasoning_phase = False
should_break_next_thought = True
if reasoning:
for event in emit_thought_part(str(reasoning)):
yield event
if has_inline_thought:
for event in emit_thought_part(str(inline_thought_chunk)):
yield event
if content_chunk:
for e in process_text(content_chunk):
yield e
case RunEvent.reasoning_content_delta.value | TeamRunEvent.reasoning_content_delta:
raw_content_chunk = _extract_text_chunk(run_event)
raw_content_chunk, inline_tool_trace_depth, inline_protocol_tail, had_protocol = _strip_inline_tool_protocol(
raw_content_chunk,
inline_tool_trace_depth,
inline_protocol_tail,
)
if had_protocol:
trace_stream(
"strip_tool_protocol",
depth=inline_tool_trace_depth,
tail_len=len(inline_protocol_tail),
cleaned_preview=_preview(raw_content_chunk),
)
raw_reasoning = _extract_reasoning_chunk(run_event)
content_segments, in_content_think_block = _split_content_by_think_tags(
raw_content_chunk,
in_content_think_block,
)
content_chunk = "".join(
seg_text for seg_type, seg_text in content_segments if seg_type == "text"
)
inline_thought_chunk = "".join(
seg_text for seg_type, seg_text in content_segments if seg_type == "thought"
)
reasoning = raw_reasoning
if reasoning and content_chunk and _is_reasoning_duplicate_of_content(
str(reasoning),
str(content_chunk),
):
trace_stream(
"suppress_reasoning_overlap",
event="reasoning_content_delta",
reasoning_preview=_preview(reasoning),
content_preview=_preview(content_chunk),
)
reasoning = ""
has_content_chunk = bool(content_chunk)
has_inline_thought = bool(inline_thought_chunk)
trace_stream(
"reasoning_delta",
has_content=has_content_chunk,
has_reasoning=bool(reasoning) or has_inline_thought,
reasoning_closed=reasoning_closed_for_current_cycle,
content_preview=_preview(content_chunk),
reasoning_preview=_preview((reasoning or "") + (inline_thought_chunk or "")),
)
has_any_thought = bool(reasoning) or has_inline_thought
if has_any_thought and reasoning_closed_for_current_cycle:
reasoning_closed_for_current_cycle = False
in_reasoning_phase = False
should_break_next_thought = True
if reasoning:
for event in emit_thought_part(str(reasoning)):
yield event
if has_inline_thought:
for event in emit_thought_part(str(inline_thought_chunk)):
yield event
if content_chunk:
for e in process_text(content_chunk):
yield e
case RunEvent.tool_call_started.value | TeamRunEvent.tool_call_started:
tool_event: ToolCallStartedEvent = run_event # type: ignore[assignment]
tool = tool_event.tool
if tool:
in_reasoning_phase = False
should_break_next_thought = True
reasoning_closed_for_current_cycle = False
in_content_think_block = False
inline_tool_trace_depth = 0
inline_protocol_tail = ""
if getattr(tool, "tool_call_id", None):
tool_start_times[tool.tool_call_id] = time.time()
current_id = current_agent_info.get("agent_id")
# If leader calls a tool, it's either an internal tool (code, etc) or delegation.
# During the tool call itself, the agent is "active".
async for e in update_status_and_yield(current_id, "active"):
yield e
trace_stream(
"tool_call_started",
tool_name=getattr(tool, "tool_name", ""),
tool_call_id=getattr(tool, "tool_call_id", None),
)
current_text_index = len(full_content)
yield _build_tool_call_event(
tool,
current_text_index,
agent_info=current_agent_info
)
case RunEvent.tool_call_completed.value | TeamRunEvent.tool_call_completed:
tool_event: ToolCallCompletedEvent = run_event # type: ignore[assignment]
tool = tool_event.tool
if tool:
in_reasoning_phase = False
should_break_next_thought = True
reasoning_closed_for_current_cycle = False
in_content_think_block = False
inline_tool_trace_depth = 0
inline_protocol_tail = ""
duration_ms = None
if tool.tool_call_id and tool.tool_call_id in tool_start_times:
duration_ms = int((time.time() - tool_start_times[tool.tool_call_id]) * 1000)
trace_stream(
"tool_call_completed",
tool_name=tool.tool_name or "",
tool_call_id=tool.tool_call_id,
is_error=bool(tool.tool_call_error),
)
tool_result_event, output = _build_tool_result_event(
tool,
duration_ms,
self._normalize_tool_output,
agent_info=current_agent_info,
)
yield tool_result_event
self._collect_search_sources(output, sources_map)
case RunEvent.run_completed.value | TeamRunEvent.run_completed:
# Extract agent info to check if this is a member or leader
event_agent_info = _extract_agent_info_from_event(
run_event,
leader_id=request.agent_id,
leader_name=request.agent_name,
leader_emoji=request.agent_emoji,
agent_metadata=agent_metadata,
)
is_member_completion = event_agent_info.get("agent_role") == "member"
# In Team Mode, only terminate when the LEADER (no agent_id on event) completes.
# Member completions should just let the main loop continue.
if is_team_mode and is_member_completion:
active_id = event_agent_info.get("agent_id")
active_name = event_agent_info.get("agent_name")
active_model = event_agent_info.get("model")
active_provider = event_agent_info.get("provider")
logger.info(
f"[TEAM] Member {active_name} completed "
f"(Model: {active_model} | Provider: {active_provider}). "
"Continuing stream..."
)
# Ensure member is marked as ready if not already handled by TeamRunEvent.run_completed
async for e in update_status_and_yield(active_id, "ready"):
yield e
continue
# Leader completed
if is_team_mode:
async for e in update_status_and_yield(request.agent_id, "idle"):
yield e
final_content, output = _extract_completed_content_and_output(
run_event,
full_content,
)
yield DoneEvent(
content=final_content or "",
output=output,
thought=full_thought.strip() or None,
sources=list(sources_map.values()) or None,
).model_dump()
if request:
asyncio.create_task(self._maybe_optimize_memories(agent, request))
# 4. Trigger Async Session Summary Update
# Only if conversation_id exists (Main Chat Flow) AND summary is enabled
if request.conversation_id and request.enable_session_summary:
# Prepare summary lines:
# - Normal flow: incremental update with last user + new assistant
# - Regenerate/Edit (or single-turn rebuild): rebuild from current request context + new assistant
if should_rebuild_summary:
new_lines = [
m for m in messages
if m.get("role") in ("user", "assistant")
]
new_lines.append({"role": "assistant", "content": final_content})
else:
new_lines = []
last_user = next((m for m in reversed(messages) if m.get("role") == "user"), None)
if last_user:
new_lines.append(last_user)
new_lines.append({"role": "assistant", "content": final_content})
_log_verbose_info(f"Triggering async summary update for {request.conversation_id} with {len(new_lines)} messages (rebuild: {should_rebuild_summary}, is_editing: {request.is_editing})")
asyncio.create_task(update_session_summary(
conversation_id=request.conversation_id,
old_summary=old_summary_json,
new_messages=new_lines,
database_provider=request.database_provider,
memory_provider=request.memory_provider,
memory_model=request.memory_model,
memory_api_key=request.memory_api_key,
memory_base_url=request.memory_base_url,
summary_provider=request.summary_provider,
summary_model=request.summary_model,
summary_api_key=request.summary_api_key,
summary_base_url=request.summary_base_url,
rebuild_from_scratch=should_rebuild_summary,
))
return
case RunEvent.run_error.value | TeamRunEvent.run_error:
error_msg = _extract_best_error_message(run_event)
active_id = current_agent_info.get("agent_id")
if active_id:
async for e in update_status_and_yield(active_id, "error"):
yield e
yield ErrorEvent(error=error_msg).model_dump()
return
else:
# Simple event Fallback (no detailed event type), just check for content
raw_content_chunk = _extract_text_chunk(run_event)
raw_content_chunk, inline_tool_trace_depth, inline_protocol_tail, _ = _strip_inline_tool_protocol(
raw_content_chunk,
inline_tool_trace_depth,
inline_protocol_tail,
)
raw_reasoning = _extract_reasoning_chunk(run_event)
content_segments, in_content_think_block = _split_content_by_think_tags(
raw_content_chunk,
in_content_think_block,
)
content_chunk = "".join(
seg_text for seg_type, seg_text in content_segments if seg_type == "text"
)
inline_thought_chunk = "".join(
seg_text for seg_type, seg_text in content_segments if seg_type == "thought"
)
if raw_reasoning:
for event in emit_thought_part(str(raw_reasoning)):
yield event
if inline_thought_chunk:
for event in emit_thought_part(str(inline_thought_chunk)):
yield event
if content_chunk:
for e in process_text(content_chunk):
yield e
except Exception as exc:
logger.error(f"Stream chat error: {exc}")
yield ErrorEvent(error=_extract_best_error_message(exc)).model_dump()
async def _continue_hitl_run(
self,
request: StreamChatRequest,
) -> AsyncGenerator[dict[str, Any], None]:
"""
Continue a paused HITL run after user submits form.
This method:
1. Retrieves requirements from storage
2. Rebuilds continuation messages with submitted form values
3. Runs agent.arun() and streams completion
4. Cleans up storage record
"""
try:
run_id = request.run_id
field_values = request.field_values or {}
_log_verbose_info(
f"[HITL] Continuing run_id={run_id!r} "
f"with database_provider={request.database_provider} "
f"and field_values={list(field_values.keys())}"
)
# 1. Fetch Session Summary from DB
session_summary_text = None
old_summary_json = None
if request.conversation_id:
try:
from ..models.db import DbFilter, DbQueryRequest
from .db_service import execute_db_async, get_db_adapter
adapter = get_db_adapter(request.database_provider)
if adapter:
req = DbQueryRequest(
providerId=adapter.config.id,
action="select",
table="conversations",
columns=["session_summary"],
filters=[DbFilter(op="eq", column="id", value=request.conversation_id)],
maybeSingle=True,
)
result = await execute_db_async(adapter, req)
if result.data and isinstance(result.data, dict):
row = result.data
raw_summary = row.get("session_summary")
if raw_summary:
if isinstance(raw_summary, str):
try:
old_summary_json = json.loads(raw_summary)
except (ValueError, json.JSONDecodeError):
pass
elif isinstance(raw_summary, dict):
old_summary_json = raw_summary
if old_summary_json:
session_summary_text = old_summary_json.get("summary")
except Exception as e:
logger.warning(f"Failed to fetch session summary in HITL flow: {e}")
# Retrieve requirements from Supabase
hitl_storage = get_hitl_storage(request.database_provider)
pending = await hitl_storage.get_pending_run(run_id)
requirements = None
saved_messages = None
saved_run_output = None
if isinstance(pending, dict):
requirements = pending.get("requirements")
saved_messages = pending.get("messages")
saved_run_output = pending.get("run_output")
else:
requirements = pending
_log_verbose_info(
"[HITL] loaded pending payload: "
f"has_requirements={bool(requirements)}, "
f"messages_type={type(saved_messages).__name__ if saved_messages is not None else 'None'}, "
f"has_run_output={saved_run_output is not None}, "
f"run_output_type={type(saved_run_output).__name__ if saved_run_output is not None else 'None'}"
)
if not requirements:
logger.error(
f"[HITL] No pending run found for run_id={run_id!r} "
f"(database_provider={request.database_provider})"
)
yield ErrorEvent(error="Form session expired or not found").model_dump()
return
# Enable skills for the definitive user-facing chat agent
request.enable_skills = True
# Get agent (same provider as original request)
agent = get_agent_for_provider(request)
_log_verbose_info(f"[HITL Continue] Agent instructions: {getattr(agent, 'instructions', None)}")
full_content = ""
full_thought = ""
sources_map: dict[str, Any] = {}
tool_start_times: dict[str, float] = {}
should_break_next_thought = False
in_reasoning_phase = False
reasoning_closed_for_current_cycle = False
in_content_think_block = False
inline_tool_trace_depth = 0
inline_protocol_tail = ""
stream_trace = _is_stream_trace_enabled()
paused_again = False # Flag to prevent cleanup when multi-form chaining occurs
stream_had_error = False
completed_content_fallback = ""
saw_terminal_completion = False
continuation_event_count = 0
last_event_name: str | None = None
last_event_type: str | None = None
last_event_run_id: str | None = None
# Current agent info for Team mode (updated per event)
current_agent_info: dict[str, Any] = {"agent_id": request.agent_id, "agent_name": request.agent_name}
def trace_stream(stage: str, **kwargs: Any) -> None:
if not stream_trace:
return
payload = ", ".join([f"{k}={v}" for k, v in kwargs.items()])
logger.info(f"[STREAM_TRACE][hitl] {stage} | {payload}")
def emit_thought_part(part: str):
nonlocal full_thought, full_content, should_break_next_thought, in_reasoning_phase, reasoning_closed_for_current_cycle
text = _strip_internal_tool_trace(str(part or ""))
if not text or not text.strip():
return
should_break_next_thought = False
in_reasoning_phase = True
full_thought += text
trace_stream("emit_reasoning", reasoning_preview=_preview(text))
current_text_index = len(full_content)
yield ThoughtEvent(
content=text,
text_index=current_text_index,
agent_id=current_agent_info.get("agent_id"),
agent_name=current_agent_info.get("agent_name"),
).model_dump(by_alias=True, exclude_none=True)
def process_text(text: str):
nonlocal full_content, in_reasoning_phase, should_break_next_thought, reasoning_closed_for_current_cycle
clean_text = _strip_internal_tool_trace(text)
if clean_text:
has_visible_text = bool(clean_text.strip())
if has_visible_text:
in_reasoning_phase = False
should_break_next_thought = True
reasoning_closed_for_current_cycle = True
full_content += clean_text
yield TextEvent(
content=clean_text,
agent_id=current_agent_info.get("agent_id"),
agent_name=current_agent_info.get("agent_name"),
).model_dump(by_alias=True, exclude_none=True)
async def _iterate_run_stream(stream: Any):
"""
Normalize both async and sync Agno run streams into an async iterator.
"""
if hasattr(stream, "__aiter__"):
async for item in stream:
yield item
return
iterator = iter(stream)
sentinel = object()
while True:
item = await asyncio.to_thread(lambda: next(iterator, sentinel))
if item is sentinel:
break
yield item
async def _stream_events(stream):
nonlocal full_content, full_thought, sources_map, tool_start_times, paused_again, stream_had_error
nonlocal in_reasoning_phase, should_break_next_thought, reasoning_closed_for_current_cycle
nonlocal in_content_think_block, inline_tool_trace_depth, inline_protocol_tail
nonlocal completed_content_fallback, saw_terminal_completion
nonlocal continuation_event_count, last_event_name, last_event_type, last_event_run_id
async for run_event in _iterate_run_stream(stream):
_append_raw_event_log(
phase="hitl_continuation",
request=request,
run_id=run_id,
run_event=run_event,
)
continuation_event_count += 1
last_event_type = type(run_event).__name__
last_event_name = str(getattr(run_event, "event", None) or last_event_type)
last_event_run_id = str(raw_event_run_id) if raw_event_run_id else None
# Extract agent info for Team mode (though Team HITL is currently disabled)
current_agent_info = _extract_agent_info_from_event(
run_event,
leader_id=request.agent_id,
leader_name=request.agent_name,
leader_emoji=request.agent_emoji,
)
# When yield_run_output=True, acontinue_run may yield the final RunOutput object.
# Capture its canonical content as a robust fallback for providers that emit sparse events.
# IMPORTANT: Do NOT re-emit this content via process_text() if we already received
# streaming chunks (run_content events). Doing so would cause the full answer to be
# appended a second time, resulting in visible duplication in the UI.
# Only use RunOutput.content as a fallback when the stream produced nothing.
if isinstance(run_event, RunOutput):
saw_terminal_completion = True
completed_content_fallback, _ = _extract_completed_content_and_output(
run_event,
completed_content_fallback or full_content,
)
# Only emit if no content was streamed yet (sparse-event provider fallback).
if not full_content:
text_from_output = _extract_text_from_message_content(
getattr(run_event, "content", None)
).strip()
if text_from_output:
for e in process_text(text_from_output):
yield e
continue
# HITL Pause Check
if hasattr(run_event, 'is_paused') and run_event.is_paused:
logger.info(f"Agent paused again during continuation (multi-form chain, run_id: {run_id})")
# Extract new requirements
new_requirements = getattr(run_event, 'active_requirements', None) or getattr(run_event, 'requirements', None)
if new_requirements:
form_requirements = [req for req in new_requirements if _is_interactive_form_requirement(req)]
if form_requirements:
# Save new form requirements (overwrites previous in memory)
hitl_storage_multi = get_hitl_storage(request.database_provider)
saved = await hitl_storage_multi.save_pending_run(
run_id=run_id,
requirements=form_requirements,
conversation_id=request.conversation_id,
user_id=request.user_id,
agent_model=request.model,
messages=saved_messages, # Reuse saved messages
run_output=(
{
"run_id": getattr(run_event, "run_id", None) or run_id,
"session_id": getattr(run_event, "session_id", None)
or request.conversation_id,
"user_id": request.user_id,
"messages": saved_messages or [],
"tools": [
tool.to_dict() if hasattr(tool, "to_dict") else tool
for tool in (getattr(run_event, "tools", None) or [])
if tool is not None
],
"requirements": [
req.to_dict() if hasattr(req, "to_dict") else req
for req in form_requirements
],
"status": "PAUSED",
}
),
)
if not saved:
raise RuntimeError("Failed to persist chained HITL pending run")
# Extract form fields and notify frontend
for req in form_requirements:
if (hasattr(req, 'needs_external_execution') and req.needs_external_execution) or \
(req.tool_execution and req.tool_execution.tool_name == "interactive_form"):
form_id, title, fields = _extract_interactive_form_payload(
req,
default_title="Please provide additional information",
)
yield FormRequestEvent(
run_id=run_id,
form_id=form_id,
title=title,
fields=fields
).model_dump()
# Send partial done event
yield DoneEvent(
content=full_content or "",
thought=full_thought.strip() or None,
sources=list(sources_map.values()) or None,
).model_dump()
logger.info(f"Multi-form: saved second form, waiting for user (run_id: {run_id})")
paused_again = True # Mark as paused again to skip cleanup
return
# If no form requirements, just continue
logger.warning(f"Agent paused again but no interactive_form found (run_id: {run_id})")
yield ErrorEvent(error="Agent paused unexpectedly").model_dump()
return
# Check if this is a detailed event (from stream_events=True or implicit)
if hasattr(run_event, 'event'):
match run_event.event:
case RunEvent.run_content.value:
raw_content_chunk = _extract_text_chunk(run_event)
raw_content_chunk, inline_tool_trace_depth, inline_protocol_tail, had_protocol = _strip_inline_tool_protocol(
raw_content_chunk,
inline_tool_trace_depth,
inline_protocol_tail,
)
if had_protocol:
trace_stream(
"strip_tool_protocol",
depth=inline_tool_trace_depth,
tail_len=len(inline_protocol_tail),
cleaned_preview=_preview(raw_content_chunk),
)
raw_reasoning = _extract_reasoning_chunk(run_event, trace_fn=trace_stream)
content_segments, in_content_think_block = _split_content_by_think_tags(
raw_content_chunk,
in_content_think_block,
)
content_chunk = "".join(
seg_text for seg_type, seg_text in content_segments if seg_type == "text"
)
inline_thought_chunk = "".join(
seg_text for seg_type, seg_text in content_segments if seg_type == "thought"
)
reasoning = raw_reasoning
if reasoning and content_chunk and _is_reasoning_duplicate_of_content(
str(reasoning),
str(content_chunk),
):
trace_stream(
"suppress_reasoning_overlap",
event="run_content",
reasoning_preview=_preview(reasoning),
content_preview=_preview(content_chunk),
)
reasoning = ""
has_content_chunk = bool(content_chunk)
has_inline_thought = bool(inline_thought_chunk)
trace_stream(
"run_content",
has_content=has_content_chunk,
has_reasoning=bool(reasoning) or has_inline_thought,
reasoning_closed=reasoning_closed_for_current_cycle,
content_preview=_preview(content_chunk),
reasoning_preview=_preview((reasoning or "") + (inline_thought_chunk or "")),
)
has_any_thought = bool(reasoning) or has_inline_thought
if has_any_thought and reasoning_closed_for_current_cycle:
reasoning_closed_for_current_cycle = False
in_reasoning_phase = False
should_break_next_thought = True
if reasoning:
for event in emit_thought_part(str(reasoning)):
yield event
if has_inline_thought:
for event in emit_thought_part(str(inline_thought_chunk)):
yield event
if content_chunk:
for e in process_text(content_chunk):
yield e
trace_stream(
"emit_content",
reasoning_closed=reasoning_closed_for_current_cycle,
content_preview=_preview(content_chunk),
)
case RunEvent.reasoning_content_delta.value:
raw_content_chunk = _extract_text_chunk(run_event)
raw_content_chunk, inline_tool_trace_depth, inline_protocol_tail, had_protocol = _strip_inline_tool_protocol(
raw_content_chunk,
inline_tool_trace_depth,
inline_protocol_tail,
)
if had_protocol:
trace_stream(
"strip_tool_protocol",
depth=inline_tool_trace_depth,
tail_len=len(inline_protocol_tail),
cleaned_preview=_preview(raw_content_chunk),
)
raw_reasoning = _extract_reasoning_chunk(run_event)
content_segments, in_content_think_block = _split_content_by_think_tags(
raw_content_chunk,
in_content_think_block,
)
content_chunk = "".join(
seg_text for seg_type, seg_text in content_segments if seg_type == "text"
)
inline_thought_chunk = "".join(
seg_text for seg_type, seg_text in content_segments if seg_type == "thought"
)
reasoning = raw_reasoning
if reasoning and content_chunk and _is_reasoning_duplicate_of_content(
str(reasoning),
str(content_chunk),
):
trace_stream(
"suppress_reasoning_overlap",
event="reasoning_content_delta",
reasoning_preview=_preview(reasoning),
content_preview=_preview(content_chunk),
)
reasoning = ""
has_content_chunk = bool(content_chunk)
has_inline_thought = bool(inline_thought_chunk)
trace_stream(
"reasoning_delta",
has_content=has_content_chunk,
has_reasoning=bool(reasoning) or has_inline_thought,
reasoning_closed=reasoning_closed_for_current_cycle,
content_preview=_preview(content_chunk),
reasoning_preview=_preview((reasoning or "") + (inline_thought_chunk or "")),
)
has_any_thought = bool(reasoning) or has_inline_thought
if has_any_thought and reasoning_closed_for_current_cycle:
reasoning_closed_for_current_cycle = False
in_reasoning_phase = False
should_break_next_thought = True
if reasoning:
for event in emit_thought_part(str(reasoning)):
yield event
if has_inline_thought:
for event in emit_thought_part(str(inline_thought_chunk)):
yield event
if content_chunk:
for e in process_text(content_chunk):
yield e
trace_stream(
"emit_content",
reasoning_closed=reasoning_closed_for_current_cycle,
content_preview=_preview(content_chunk),
)
case RunEvent.tool_call_started.value:
tool_event: ToolCallStartedEvent = run_event # type: ignore[assignment]
tool = tool_event.tool
if tool:
in_reasoning_phase = False
should_break_next_thought = True
reasoning_closed_for_current_cycle = False
in_content_think_block = False
inline_tool_trace_depth = 0
inline_protocol_tail = ""
if tool.tool_call_id:
tool_start_times[tool.tool_call_id] = time.time()
trace_stream(
"tool_call_started",
tool_name=tool.tool_name or "",
tool_call_id=tool.tool_call_id,
)
current_text_index = len(full_content)
yield _build_tool_call_event(tool, current_text_index)
case RunEvent.tool_call_completed.value:
tool_event: ToolCallCompletedEvent = run_event # type: ignore[assignment]
tool = tool_event.tool
if tool:
in_reasoning_phase = False
should_break_next_thought = True
reasoning_closed_for_current_cycle = False
in_content_think_block = False
inline_tool_trace_depth = 0
inline_protocol_tail = ""
duration_ms = None
if tool.tool_call_id and tool.tool_call_id in tool_start_times:
duration_ms = int((time.time() - tool_start_times[tool.tool_call_id]) * 1000)
trace_stream(
"tool_call_completed",
tool_name=tool.tool_name or "",
tool_call_id=tool.tool_call_id,
is_error=bool(tool.tool_call_error),
)
tool_result_event, output = _build_tool_result_event(
tool,
duration_ms,
self._normalize_tool_output,
)
yield tool_result_event
self._collect_search_sources(output, sources_map)
case RunEvent.run_completed.value:
saw_terminal_completion = True
completed_content_fallback, _ = _extract_completed_content_and_output(
run_event,
completed_content_fallback or full_content,
)
case RunEvent.run_error.value:
error_msg = _extract_best_error_message(run_event)
stream_had_error = True
yield ErrorEvent(error=error_msg).model_dump()
return
else:
# Simple event Fallback
raw_content_chunk = _extract_text_chunk(run_event)
raw_content_chunk, inline_tool_trace_depth, inline_protocol_tail, _ = _strip_inline_tool_protocol(
raw_content_chunk,
inline_tool_trace_depth,
inline_protocol_tail,
)
raw_reasoning = _extract_reasoning_chunk(run_event)
content_segments, in_content_think_block = _split_content_by_think_tags(
raw_content_chunk,
in_content_think_block,
)
content_chunk = "".join(
seg_text for seg_type, seg_text in content_segments if seg_type == "text"
)
inline_thought_chunk = "".join(
seg_text for seg_type, seg_text in content_segments if seg_type == "thought"
)
if raw_reasoning:
for event in emit_thought_part(str(raw_reasoning)):
yield event
if inline_thought_chunk:
for event in emit_thought_part(str(inline_thought_chunk)):
yield event
if content_chunk:
for e in process_text(content_chunk):
yield e
def _build_fallback_messages():
if not saved_messages:
return None
updated_messages = list(saved_messages)
for req in requirements:
tool_exec = getattr(req, 'tool_execution', None)
tool_name = getattr(tool_exec, 'tool_name', None) if tool_exec else None
if tool_name != "interactive_form":
continue
tool_args = getattr(tool_exec, 'tool_args', {}) if tool_exec else {}
tool_call_id = getattr(req, "id", None) or tool_args.get("id") or f"form-{int(time.time() * 1000)}"
updated_messages.append({
"role": "assistant",
"content": None,
"tool_calls": [{
"id": tool_call_id,
"type": "function",
"function": {
"name": "interactive_form",
"arguments": json.dumps(tool_args or {}),
}
}],
})
updated_messages.append({
"role": "tool",
"content": json.dumps(field_values),
"tool_call_id": tool_call_id,
})
return updated_messages
def _apply_field_values_to_requirements() -> list[Any]:
"""
Resolve pending HITL requirements with the submitted form payload.
Prefer Agno-native requirement resolution so we can use acontinue_run().
"""
resolved_requirements: list[Any] = []
serialized_values = json.dumps(field_values, ensure_ascii=False)
for req in requirements or []:
try:
if hasattr(req, "needs_external_execution") and req.needs_external_execution:
req.set_external_execution_result(serialized_values)
tool_exec = getattr(req, "tool_execution", None)
if tool_exec is not None and getattr(tool_exec, "result", None) is None:
tool_exec.result = serialized_values
elif hasattr(req, "needs_user_input") and req.needs_user_input:
req.provide_user_input(field_values)
elif hasattr(req, "needs_confirmation") and req.needs_confirmation:
req.confirm()
except Exception as req_err:
logger.warning(
f"[HITL] Failed to resolve requirement {getattr(req, 'id', None)} for run_id={run_id}: {req_err}"
)
resolved_requirements.append(req)
return resolved_requirements
def _build_continuation_agent_input(base_messages: list[dict[str, Any]]) -> list[dict[str, Any]]:
messages = self._inject_local_time_context(list(base_messages), request, [])
system_messages = [m for m in messages if m.get("role") == "system"]
chat_messages = [m for m in messages if m.get("role") != "system"]
raw_turn_limit = request.context_turn_limit
turn_limit = (
max(1, min(50, int(raw_turn_limit)))
if isinstance(raw_turn_limit, int) and raw_turn_limit > 0
else 2
)
user_indices = [i for i, m in enumerate(chat_messages) if m.get("role") == "user"]
user_turn_count = len(user_indices)
if user_turn_count > turn_limit:
cutoff_index = user_indices[-turn_limit]
recent_history = chat_messages[cutoff_index:]
else:
recent_history = chat_messages
should_inject_summary = bool(session_summary_text) and (user_turn_count > turn_limit)
if should_inject_summary:
summary_prompt = (
"\n\nSession memory summary:\n"
"Here is a summary of the conversation so far. Use this to understand long-term context, "
"but prioritize the details in the recent messages below.\n"
f"{session_summary_text}\n"
)
if system_messages:
last_sys = system_messages[-1]
if "Session memory summary:" not in str(last_sys.get("content", "")):
last_sys["content"] = str(last_sys.get("content", "")) + summary_prompt
else:
system_messages.append({"role": "system", "content": summary_prompt})
return system_messages + recent_history
resolved_requirements = _apply_field_values_to_requirements()
stream = None
try:
restored_run_output = None
if isinstance(saved_run_output, dict):
try:
restored_run_output = RunOutput.from_dict(dict(saved_run_output))
# Ensure external-execution tool results are concretely attached to
# run_response.tools before acontinue_run() processes updates.
if restored_run_output and isinstance(restored_run_output.tools, list):
serialized_values = json.dumps(field_values, ensure_ascii=False)
tool_result_by_id: dict[str, Any] = {}
for req in resolved_requirements or []:
tool_exec = getattr(req, "tool_execution", None)
if not tool_exec:
continue
tcid = getattr(tool_exec, "tool_call_id", None)
if tcid:
tool_result_by_id[str(tcid)] = getattr(tool_exec, "result", None)
for tool in restored_run_output.tools:
if getattr(tool, "result", None) is not None:
continue
tcid = getattr(tool, "tool_call_id", None)
if tcid and str(tcid) in tool_result_by_id:
tool.result = tool_result_by_id[str(tcid)]
continue
if getattr(tool, "tool_name", None) == "interactive_form":
tool.result = serialized_values
# OpenAI-compatible tool flow requires an assistant message
# with matching tool_calls before any tool message can appear.
if restored_run_output and isinstance(restored_run_output.tools, list):
existing_messages = (
list(restored_run_output.messages)
if isinstance(restored_run_output.messages, list)
else []
)
existing_tool_call_ids: set[str] = set()
for msg in existing_messages:
msg_tool_calls = getattr(msg, "tool_calls", None) or []
for tc in msg_tool_calls:
if isinstance(tc, dict) and tc.get("id"):
existing_tool_call_ids.add(str(tc.get("id")))
for tool in restored_run_output.tools:
tool_call_id = getattr(tool, "tool_call_id", None)
tool_name = getattr(tool, "tool_name", None) or "interactive_form"
if not tool_call_id or str(tool_call_id) in existing_tool_call_ids:
continue
tool_args = getattr(tool, "tool_args", None) or {}
existing_messages.append(
Message(
role="assistant",
content="",
tool_calls=[
{
"id": str(tool_call_id),
"type": "function",
"function": {
"name": str(tool_name),
"arguments": json.dumps(tool_args, ensure_ascii=False),
},
}
],
)
)
existing_tool_call_ids.add(str(tool_call_id))
restored_run_output.messages = existing_messages
if resolved_requirements:
restored_run_output.requirements = resolved_requirements
except Exception as restore_err:
logger.warning(
f"[HITL] Failed to restore RunOutput for run_id={run_id}, fallback to run_id path: {restore_err}"
)
_log_verbose_info(
f"[HITL] continuation restore mode: {'run_response' if restored_run_output is not None else 'run_id'} "
f"(has_saved_run_output={isinstance(saved_run_output, dict)})"
)
_log_verbose_info(
f"Running HITL continuation via continue_run (run_id: {run_id}, session_id: {request.conversation_id})"
)
if restored_run_output is not None:
stream = agent.continue_run(
run_response=restored_run_output,
stream=True,
stream_events=True,
yield_run_output=True,
user_id=request.user_id,
session_id=request.conversation_id,
output_schema=request.output_schema,
)
else:
stream = agent.continue_run(
run_id=run_id,
requirements=resolved_requirements,
stream=True,
stream_events=True,
yield_run_output=True,
user_id=request.user_id,
session_id=request.conversation_id,
output_schema=request.output_schema,
)
except Exception as continue_err:
logger.warning(
f"[HITL] continue_run failed for run_id={run_id}, fallback to rebuilt arun: {continue_err}"
)
fallback_messages = _build_fallback_messages()
if not fallback_messages:
yield ErrorEvent(error="Form session cannot be resumed (missing state)").model_dump()
return
agent_input = _build_continuation_agent_input(fallback_messages)
stream = agent.arun(
input=agent_input,
stream=True,
stream_events=True,
user_id=request.user_id,
session_id=request.conversation_id,
output_schema=request.output_schema,
)
async for event in _stream_events(stream):
yield event
_log_verbose_info(
"[HITL] continuation stream summary: "
f"run_id={run_id}, "
f"events={continuation_event_count}, "
f"last_event={last_event_name}, "
f"last_event_type={last_event_type}, "
f"last_event_run_id={last_event_run_id}, "
f"saw_terminal_completion={saw_terminal_completion}, "
f"stream_had_error={stream_had_error}, "
f"paused_again={paused_again}"
)
if stream_had_error:
logger.warning(f"HITL run {run_id} ended with stream error; skipping done/cleanup")
return
if not saw_terminal_completion:
logger.warning(
f"HITL run {run_id} stream ended without terminal completion event; skipping done/cleanup"
)
yield ErrorEvent(error="HITL continuation ended before completion").model_dump()
return
# Stream completed, send done event
yield DoneEvent(
content=full_content or completed_content_fallback,
thought=full_thought.strip() or None,
sources=list(sources_map.values()) or None,
).model_dump()
# Clean up Supabase (skip if paused again for multi-form)
if not paused_again:
await hitl_storage.delete_pending_run(run_id)
logger.info(f"HITL run {run_id} completed and cleaned up")
else:
logger.info(f"HITL run {run_id} paused again (multi-form), skipping cleanup")
# 6. Trigger Async Session Summary Update
if request.conversation_id and not paused_again:
# For HITL resumption, extract the last turn's context from saved_messages
# (matching normal flow: only last user + assistant, not full history)
summary_messages = []
# Extract only the last user-assistant turn from saved_messages
if saved_messages:
last_user_idx = -1
for i in range(len(saved_messages) - 1, -1, -1):
if saved_messages[i].get("role") == "user":
last_user_idx = i
break
if last_user_idx >= 0:
# Include from last user message to end of saved_messages
# This captures: user question -> assistant form(s) -> any intermediate interactions
for msg in saved_messages[last_user_idx:]:
role = msg.get("role")
content = msg.get("content")
# Only include user/assistant messages with content for summary
if role in ("user", "assistant") and content:
summary_messages.append({"role": role, "content": content})
# Add the form submission as user input (provides structured data context)
form_submission_text = f"[Form Submitted] Values: {json.dumps(field_values)}"
summary_messages.append({"role": "user", "content": form_submission_text})
# Add the new assistant response (based on form data)
summary_messages.append({"role": "assistant", "content": full_content})
_log_verbose_info(f"Triggering async summary update for {request.conversation_id} (Resumed HITL flow, {len(summary_messages)} messages)")
asyncio.create_task(update_session_summary(
conversation_id=request.conversation_id,
old_summary=old_summary_json,
new_messages=summary_messages,
database_provider=request.database_provider,
memory_provider=request.memory_provider,
memory_model=request.memory_model,
memory_api_key=request.memory_api_key,
memory_base_url=request.memory_base_url,
summary_provider=request.summary_provider,
summary_model=request.summary_model,
summary_api_key=request.summary_api_key,
summary_base_url=request.summary_base_url,
rebuild_from_scratch=False, # HITL resumption is usually incremental
))
except Exception as exc:
import traceback
error_details = traceback.format_exc()
logger.error(f"HITL continuation error: {exc}\n{error_details}")
yield ErrorEvent(error=_extract_best_error_message(exc)).model_dump()
def _collect_enabled_tool_names(self, request: StreamChatRequest) -> set[str]:
names: list[str] = []
if request.provider != "gemini":
for tool_id in request.tool_ids or []:
names.append(resolve_tool_name(str(tool_id)))
for tool_def in request.tools or []:
if hasattr(tool_def, "model_dump"):
tool_def = tool_def.model_dump()
if not isinstance(tool_def, dict):
continue
name = tool_def.get("function", {}).get("name") or tool_def.get("name")
if name:
names.append(resolve_tool_name(str(name)))
for user_tool in request.user_tools or []:
if hasattr(user_tool, "name") and user_tool.name:
names.append(str(user_tool.name))
return set(names)
def _inject_local_time_context(
self,
messages: list[dict[str, Any]],
request: StreamChatRequest,
pre_events: list[dict[str, Any]],
) -> list[dict[str, Any]]:
if not messages:
return messages
timezone = request.user_timezone or "UTC"
locale = request.user_locale or "en-US"
time_result = self._compute_local_time(timezone, locale)
try:
local_date = datetime.fromisoformat(str(time_result.get("iso"))).strftime("%Y-%m-%d")
except Exception:
local_date = str(time_result.get("formatted", "")).split(" ")[0] or datetime.now().strftime(
"%Y-%m-%d"
)
tz_label = str(time_result.get("timezone") or timezone)
note = (
f"\n\n[Time note for this query]\n"
f"Note: local date is {local_date} ({tz_label}). "
"Interpret relative time terms using this date."
)
updated: list[dict[str, Any]] = []
for msg in messages:
if not isinstance(msg, dict) or msg.get("role") != "user":
updated.append(msg)
continue
content = msg.get("content")
if isinstance(content, str):
if "[Time note for this query]" in content:
updated.append(msg)
else:
updated.append({**msg, "content": f"{content}{note}"})
continue
if isinstance(content, list):
has_note = False
for part in content:
if isinstance(part, dict):
if "[Time note for this query]" in str(part.get("text", "")) or "[Time note for this query]" in str(part.get("content", "")):
has_note = True
break
if has_note:
updated.append(msg)
else:
updated.append({**msg, "content": [*content, {"type": "text", "text": note}]})
continue
updated.append(msg)
return updated
def _compute_local_time(self, timezone: str, locale: str) -> dict[str, Any]:
try:
tzinfo = ZoneInfo(timezone)
now = datetime.now(tzinfo)
except Exception:
now = datetime.now()
return {
"timezone": timezone,
"locale": locale,
"formatted": now.strftime("%Y-%m-%d %H:%M:%S"),
"iso": now.isoformat(),
"now": now,
}
def _inject_tool_guidance(
self,
messages: list[dict[str, Any]],
enabled_tools: set[str],
request: Any | None = None,
) -> list[dict[str, Any]]:
if not enabled_tools:
return messages
updated = list(messages)
system_index = next((i for i, m in enumerate(updated) if m.get("role") == "system"), -1)
no_tool_narration_guidance = (
"\n\n[OUTPUT DIRECTIVES]\n"
"1. The main text (Answer) must contain ONLY the final helpful content and necessary explanations for the user.\n"
"2. In the main text, NEVER describe that you are going to, are currently, or have already called any tools, searched, browsed, retrieved memory, or queried databases. These are internal traces (Trace).\n"
"3. In the main text, do NOT refer to yourself performing actions (e.g., \"Let me check\", \"I will search\", \"I have retrieved\").\n"
" Instead, directly present results as established information.\n"
"4. If citing sources, use neutral phrasing such as: \"According to available data\", \"Based on public information\", \"According to the returned data\".\n"
" Never mention tool names or the calling process.\n"
"5. If information is insufficient, directly state the missing gap and ask clarifying questions.\n"
" Do NOT say \"Let me check again\" or similar transitional action phrases.\n"
"6. Once you start presenting the final answer, do not switch back to planning, searching, or tool-calling language.\n"
"7. The final answer should begin naturally with the content itself, without meta commentary or transitional phrases.\n"
)
updated = self._append_system_message(updated, no_tool_narration_guidance, system_index)
system_index = next((i for i, m in enumerate(updated) if m.get("role") == "system"), -1)
if "interactive_form" in enabled_tools:
form_guidance = (
"\n[TOOL USE GUIDANCE]\n"
"When you need to collect structured information from the user (e.g. preferences, requirements, "
"booking details), use the 'interactive_form' tool.\n"
"CRITICAL: DO NOT list questions in text or markdown. YOU MUST USE the 'interactive_form' tool to "
"display fields.\n"
"CRITICAL: If the user explicitly asks you to confirm something via a form, interactive form, or form tool, "
"you MUST call 'interactive_form' in this response instead of asking in plain text.\n"
"CRITICAL: If you need approval before installing a dependency, you MUST use 'interactive_form'. "
"Do NOT ask yes/no approval questions in normal prose when the tool is available.\n"
"CRITICAL: For approval forms, ask only for information the model does NOT already know. "
"If skill_id or package_name are already known from the current tool result or failure context, "
"do NOT include text inputs for them in the form.\n"
"CRITICAL: For dependency installation approval, prefer a single required field such as "
"'approve_install', and place the package name in the form title or description.\n"
"Keep forms concise (3-6 fields).\n\n"
"[SIMPLIFIED PAYLOAD]\n"
"You may use a minimal payload to reduce tool-call size.\n"
"- 'id' and 'title' are optional.\n"
"- Each field may be minimal (e.g., {'name':'budget'}) or even a short string label.\n"
"- Backend will auto-fill missing label/type defaults.\n\n"
"[MANDATORY TEXT-FIRST RULE]\n"
"CRITICAL: You MUST output meaningful introductory text BEFORE calling 'interactive_form'.\n"
"- NEVER call 'interactive_form' as the very first thing in your response\n"
"- ALWAYS explain the context, acknowledge the user's request, or provide guidance BEFORE the form\n"
"- Minimum: Output at least 1-2 sentences before the form call\n"
'- Example: "I can help you with that. To provide the best recommendation, please share some '
'details below:"\n\n'
"[SINGLE FORM PER RESPONSE]\n"
"CRITICAL: You may call 'interactive_form' ONLY ONCE per response. Do NOT call it multiple times in "
"the same answer.\n"
"If you need to collect information, design ONE comprehensive form that gathers all necessary "
"details at once.\n\n"
"[MULTI-TURN INTERACTIONS]\n"
"1. If the information from a submitted form is insufficient, you MAY present another "
"'interactive_form' in your NEXT response (after the user submits the first form).\n"
"2. LIMIT: Use at most 2-3 forms total across the entire conversation. Excessive questioning "
"frustrates users.\n"
"3. INTERLEAVING: You can place the form anywhere in your response. Output introductory text FIRST "
"(e.g., \"I can help with that. Please provide some details below:\"), then call 'interactive_form' "
"once.\n"
"4. If the user has provided enough context through previous forms, proceed directly to the final "
"answer without requesting more information."
)
updated = self._append_system_message(updated, form_guidance, system_index)
system_index = next((i for i, m in enumerate(updated) if m.get("role") == "system"), -1)
search_tools_requiring_citations = {
"Tavily_web_search",
"Tavily_academic_search",
"web_search_using_tavily",
"web_search",
"search_news",
"search_exa",
"search_arxiv_and_return_articles",
"search_wikipedia",
}
if enabled_tools.intersection(search_tools_requiring_citations):
citation_prompt = (
"\n\n[IMPORTANT] You have access to search tools. When you use them to answer a question, "
"you MUST cite the search results in your answer using the format [1], [2], etc., "
"corresponding to the index of the search result provided in the tool output. Do not fabricate "
"citations."
)
updated = self._append_system_message(updated, citation_prompt, system_index)
if "local_time" in enabled_tools:
local_time_guidance = (
"\n\n[TIME CONTEXT GUIDANCE]\n"
"A local-date note is already appended to each user query before model execution.\n"
"Do not call local_time again unless the user explicitly asks to refresh/recheck time."
)
updated = self._append_system_message(updated, local_time_guidance, system_index)
# [REINFORCED] System-level long-term memory guidance for the internal agent-memory skill
if getattr(request, "enable_long_term_memory", False):
memory_guidance = (
"\n\n[SYSTEM-LEVEL META-RULE: LONG-TERM MEMORY]\n"
"You are equipped with a persistent memory layer ('agent-memory'). This is a core infrastructure skill that overrides situational constraints.\n"
"1) MANDATORY PREREQUISITE: If a query involves personal history, preferences, or prior facts, searching memory is a mandatory PREREQUISITE. "
"This MUST be executed BEFORE any other skill logic (e.g., Roleplay, Image Search) to avoid factual inconsistency.\n"
"2) PERSISTENCE AGAINST OVERRIDES: This meta-rule remains in effect even if other skills (like roleplay) demand immediate responses. "
"Retrieving ground-truth user context is the first step of all processing.\n"
"3) USAGE: Call 'execute_skill_script' with skill_id='agent-memory'. Factual alignment via memory retrieval is non-negotiable."
)
updated = self._append_system_message(updated, memory_guidance, system_index)
return updated
def _append_system_message(
self,
messages: list[dict[str, Any]],
addition: str,
system_index: int,
) -> list[dict[str, Any]]:
updated = list(messages)
if system_index != -1:
updated[system_index] = {
**updated[system_index],
"content": f"{updated[system_index].get('content', '')}{addition}",
}
else:
updated.insert(0, {"role": "system", "content": addition})
return updated
def _normalize_tool_output(self, output: Any) -> Any:
if hasattr(output, "model_dump"):
try:
return output.model_dump()
except Exception:
return str(output)
if isinstance(output, dict):
return output
if isinstance(output, list):
return [self._normalize_tool_output(item) for item in output]
return output
def _collect_search_sources(self, result: Any, sources_map: dict[str, Any]) -> None:
def _extract_results(payload: Any) -> list[dict[str, Any]]:
if isinstance(payload, list):
return [item for item in payload if isinstance(item, dict)]
if isinstance(payload, dict):
for key in ("results", "items", "data", "sources", "articles", "news", "papers"):
value = payload.get(key)
if isinstance(value, list):
return [item for item in value if isinstance(item, dict)]
return []
results = _extract_results(result)
if not results:
return
for item in results:
url = (
item.get("url")
or item.get("link")
or item.get("uri")
or item.get("source")
or item.get("href")
)
if not url or url in sources_map:
continue
title = (
item.get("title")
or item.get("name")
or item.get("headline")
or item.get("paper_title")
or "Unknown Source"
)
snippet = (
item.get("content")
or item.get("snippet")
or item.get("summary")
or item.get("abstract")
or ""
)
sources_map[url] = SourceEvent(
uri=url,
title=title,
snippet=str(snippet)[:200],
).model_dump()
async def _maybe_optimize_memories(self, agent: Agent, request: StreamChatRequest) -> None:
return
def _map_field_type_to_frontend(self, field_type: Any) -> str:
"""
Map Python/Agno field types to frontend form types.
Args:
field_type: Python type (class or string)
Returns:
Frontend form field type (text, number, checkbox, etc.)
"""
# Handle cases where field_type is a class/type instead of a string
field_type_str = ""
if isinstance(field_type, type):
field_type_str = field_type.__name__
elif not isinstance(field_type, str):
field_type_str = str(field_type)
else:
field_type_str = field_type
type_mapping = {
"str": "text",
"int": "number",
"float": "number",
"bool": "checkbox",
"date": "date",
"time": "time",
"datetime": "datetime",
"list": "text",
"dict": "textarea",
}
return type_mapping.get(field_type_str.lower(), "text")
_stream_chat_service: StreamChatService | None = None
def get_stream_chat_service() -> StreamChatService:
global _stream_chat_service
if _stream_chat_service is None:
_stream_chat_service = StreamChatService()
return _stream_chat_service
|