KES-Hack / tests /test_ingest.py
Meshyboi's picture
Upload 86 files
214209a verified
import asyncio
from starlette.datastructures import FormData
from src.services.ingest import handle_incoming_message
async def _call_ingest(payload: dict, channel: str = "whatsapp"):
form_data = FormData(payload)
return await handle_incoming_message(form_data, channel=channel)
def test_ingest_returns_media_ack_for_media_message():
payload = {
"From": "whatsapp:+911234567890",
"NumMedia": "1",
"MediaUrl0": "https://example.com/image.jpg",
"MediaContentType0": "image/jpeg",
}
response = asyncio.run(_call_ingest(payload))
assert response == "Received your media. Analyzing for deepfakes and security threats..."
def test_ingest_returns_url_ack_for_url_text():
payload = {
"From": "whatsapp:+911234567890",
"NumMedia": "0",
"Body": "Is this safe? http://suspicious-link.com",
}
response = asyncio.run(_call_ingest(payload))
assert response == "Extracting URL. Verifying safety against phishing databases..."
def test_ingest_returns_generic_ack_for_plain_text():
payload = {
"From": "whatsapp:+911234567890",
"NumMedia": "0",
"Body": "Hello, please scan this",
}
response = asyncio.run(_call_ingest(payload))
assert response.startswith("Received text analysis request. Security scan initialized")