Spaces:
Sleeping
Sleeping
File size: 1,968 Bytes
fd1a435 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 | import asyncio
import json
import httpx
import socketio
import time
from firebase_admin import credentials, db, initialize_app
import os
from dotenv import load_dotenv
load_dotenv()
database_url = os.getenv("FIREBASE_DB_URL")
cred = credentials.Certificate("firebase/serviceAccountKey.json")
initialize_app(cred, {
'databaseURL': database_url
})
BASE_URL = "http://127.0.0.1:8000"
async def test_full_flow():
print("TESTING DAY 1 CONNECTION...\n")
print("1. Creating session...")
resp = httpx.post(f"{BASE_URL}/start")
data = resp.json()
session_id = data["session_id"]
print(f" Session ID: {session_id}")
print("\n2. Connecting to Socket.IO...")
sio = socketio.AsyncClient()
@sio.event
async def connect():
print(" Connected!")
await sio.emit('join', {'session_id': session_id})
@sio.on('log_update')
async def on_log(data):
print(f" [{data['type']}] {data['message']}")
await sio.connect(BASE_URL)
print(" Joined room")
# 3. Send Query
print("\n3. Sending query...")
httpx.post(
f"{BASE_URL}/session/{session_id}/query",
json={
"query": "I want an agent for my online pharmacy that checks prescription validity and sends SMS confirmations",
"api_key": os.getenv("OPENAI_API_KEY")
}
)
print("\n4. Receiving live logs...\n")
await asyncio.sleep(8)
print("\n5. Verifying Firebase Realtime DB...")
ref = db.reference(f'sessions/{session_id}')
fb_data = ref.get()
print(f" Status: {fb_data.get('status')}")
print(f" Logs count: {len(fb_data.get('logs', []))}")
print(f" User query: {fb_data.get('user_query')}")
await sio.disconnect()
print("\nDAY 1 CONNECTION: PASSED!")
print(f" Firebase URL: https://console.firebase.google.com/project/_/database/realtime/data/sessions/{session_id}")
if __name__ == "__main__":
asyncio.run(test_full_flow()) |