import asyncio import sys import os from datetime import datetime # Add project root to path sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from app.ai.agent.graph import get_aida_graph from app.ai.agent.schemas import ListingDraft async def verify_publish(): print("šŸš€ Verifying Publish Flow...") graph = get_aida_graph() session_id = f"verify_pub_{int(datetime.now().timestamp())}" user_id = "tester" # Manually inject state that is ready to publish # We can't easily inject state into LangGraph memory without running the flow # So we'll run the flow quickly # 1. Start Listing print("\n1. creating listing...") state_1 = await graph.ainvoke({ "user_id": user_id, "session_id": session_id, "user_role": "landlord", "last_user_message": "List a 2-bed in Lagos for 50k", "start_new_session": True }, config={"configurable": {"thread_id": session_id}, "recursion_limit": 50}) if state_1.get("listing_draft"): print(" Draft created successfully") else: print("āŒ Draft creation failed") return # 2. Publish print("\n2. Sending 'publish' command...") state_2 = await graph.ainvoke({ "user_id": user_id, "session_id": session_id, "user_role": "landlord", "last_user_message": "publish" }, config={"configurable": {"thread_id": session_id}, "recursion_limit": 50}) temp_data = state_2.get("temp_data", {}) action = temp_data.get("action") replace = temp_data.get("replace_last_message") draft_ui = temp_data.get("draft_ui", {}) print(f" Action: {action} (Expected: published)") print(f" Replace Flag: {replace} (Expected: True)") print(f" Draft UI Status: {draft_ui.get('status')} (Expected: published)") if action == "published" and replace is True and draft_ui.get("status") == "published": print("\nāœ… PUBLISH VERIFICATION PASSED!") else: print("\nāŒ PUBLISH VERIFICATION FAILED") if __name__ == "__main__": asyncio.run(verify_publish())