agentforge / test /test_connection.py
Tahasaif3's picture
'code'
fd1a435
import asyncio
import json
import httpx
import socketio
import time
from firebase_admin import credentials, db, initialize_app
import os
from dotenv import load_dotenv
load_dotenv()
database_url = os.getenv("FIREBASE_DB_URL")
cred = credentials.Certificate("firebase/serviceAccountKey.json")
initialize_app(cred, {
'databaseURL': database_url
})
BASE_URL = "http://127.0.0.1:8000"
async def test_full_flow():
print("TESTING DAY 1 CONNECTION...\n")
print("1. Creating session...")
resp = httpx.post(f"{BASE_URL}/start")
data = resp.json()
session_id = data["session_id"]
print(f" Session ID: {session_id}")
print("\n2. Connecting to Socket.IO...")
sio = socketio.AsyncClient()
@sio.event
async def connect():
print(" Connected!")
await sio.emit('join', {'session_id': session_id})
@sio.on('log_update')
async def on_log(data):
print(f" [{data['type']}] {data['message']}")
await sio.connect(BASE_URL)
print(" Joined room")
# 3. Send Query
print("\n3. Sending query...")
httpx.post(
f"{BASE_URL}/session/{session_id}/query",
json={
"query": "I want an agent for my online pharmacy that checks prescription validity and sends SMS confirmations",
"api_key": os.getenv("OPENAI_API_KEY")
}
)
print("\n4. Receiving live logs...\n")
await asyncio.sleep(8)
print("\n5. Verifying Firebase Realtime DB...")
ref = db.reference(f'sessions/{session_id}')
fb_data = ref.get()
print(f" Status: {fb_data.get('status')}")
print(f" Logs count: {len(fb_data.get('logs', []))}")
print(f" User query: {fb_data.get('user_query')}")
await sio.disconnect()
print("\nDAY 1 CONNECTION: PASSED!")
print(f" Firebase URL: https://console.firebase.google.com/project/_/database/realtime/data/sessions/{session_id}")
if __name__ == "__main__":
asyncio.run(test_full_flow())