Spaces:
Sleeping
Sleeping
| from datetime import datetime | |
| from dateutil import parser as dtparser | |
| from server.agent import run_agent, run_agent_stream | |
| from server.schema import ActionPlan | |
| def test_stream_yields_partials_then_plan(): | |
| steps = 0 | |
| final = None | |
| for acc, plan in run_agent_stream("Alex: lunch 1pm tomorrow?"): | |
| steps += 1 | |
| assert isinstance(acc, str) | |
| if plan is not None: | |
| final = plan | |
| assert steps > 1 # streamed in chunks | |
| assert isinstance(final, ActionPlan) | |
| assert len(final.events) == 1 | |
| # start/end parse as ISO 8601 | |
| dtparser.isoparse(final.events[0].start) | |
| dtparser.isoparse(final.events[0].end) | |
| def test_no_time_thread_has_no_events(): | |
| plan = run_agent("lol that meme killed me") | |
| assert isinstance(plan, ActionPlan) | |
| assert plan.events == [] | |
| assert plan.reply_draft # still drafts a reply | |
| def test_run_agent_marks_images(): | |
| # stub ignores image content but the path should not error | |
| plan = run_agent("dinner 6pm", images=["data:image/png;base64,xx"]) | |
| assert isinstance(plan, ActionPlan) | |