updated_mongodb_p / tests /test_streaming_learning.py
ape-dev
Add multi-domain RAG (Chroma) with auto-detected domain
6420ca7
from __future__ import annotations
def test_streaming_turns_seed_and_flush_bandit_signals(monkeypatch):
import mongomock
import pymongo
monkeypatch.setattr(pymongo, "MongoClient", mongomock.MongoClient)
import ape.orchestrator as orch_mod
import ape.store.mongo_store as mongo_store_mod
from ape.config import seed_all
from ape.orchestrator import ApeOrchestrator
from ape.store import MongoStore
monkeypatch.setattr(mongo_store_mod, "MongoClient", mongomock.MongoClient)
store = MongoStore(uri="mongodb://localhost:27017", db_name="ape_streaming_test")
seed_all(store, domain="finance")
def fake_classify(client, model, query, history, prev_format=None):
return {
"intent": "Definitional",
"intent_confidence": 0.99,
"domain": "finance",
"topic": "roth_ira",
"signal": "deeper_question",
}
def fake_stream(client, model, query, strategy, history, context=""):
yield {"type": "delta", "text": "hello"}
yield {"type": "done", "rendered_format": "paragraph", "response": "hello"}
monkeypatch.setattr(orch_mod, "classify_and_detect", fake_classify)
monkeypatch.setattr(orch_mod, "generate_response_stream", fake_stream)
orch = ApeOrchestrator(client=object(), model="fake", store=store, domain="finance")
first_events = list(orch.handle_turn_streaming("alice", "What is a Roth IRA?"))
first_response_id = first_events[-1]["response_id"]
first_session_id = first_events[-1]["session_id"]
first_row = store.get_response(first_response_id)
assert first_row["reward_status"] == "PENDING"
assert first_row["pending_signals"][0]["signal"] == "format_compliance_pass"
list(orch.handle_turn_streaming("alice", "Can you explain more?", session_id=first_session_id))
finalized = store.get_response(first_response_id)
assert finalized["reward_status"] == "APPLIED"
assert finalized["normalized_reward"] == 0.5
assert finalized["signal"] in {"pattern_engaged_positive", "format_compliance_pass"}