deepfake-hunter / zoom_plugin /webhook_handler.py
sammoftah's picture
Deploy Deepfake Hunter
faaaf38 verified
"""
Deepfake Hunter - Zoom Webhook Handler
Handles Zoom meeting webhooks and processes video streams for deepfake detection.
Author: Deepfake Hunter Team
License: MIT
"""
import os
import json
import hmac
import hashlib
from typing import Dict, Any, Optional
from datetime import datetime
from fastapi import FastAPI, Request, HTTPException, Header
from pydantic import BaseModel
import uvicorn
from loguru import logger
# Initialize FastAPI
app = FastAPI(title="Deepfake Hunter - Zoom Webhook Handler")
# Configuration
ZOOM_WEBHOOK_SECRET = os.getenv("ZOOM_WEBHOOK_SECRET", "your-zoom-webhook-secret")
ZOOM_VERIFICATION_TOKEN = os.getenv("ZOOM_VERIFICATION_TOKEN", "your-zoom-verification-token")
API_BASE_URL = os.getenv("DEEPFAKE_API_URL", "http://localhost:8001")
API_KEY = os.getenv("DEEPFAKE_API_KEY", "your-secret-api-key")
# Store active meeting sessions
active_meetings = {}
class ZoomWebhookEvent(BaseModel):
"""Zoom webhook event model"""
event: str
payload: Dict[str, Any]
def verify_zoom_webhook(
request_body: bytes,
signature: str,
timestamp: str
) -> bool:
"""
Verify Zoom webhook signature
Args:
request_body: Raw request body
signature: X-Zm-Signature header
timestamp: X-Zm-Request-Timestamp header
Returns:
True if signature is valid
"""
# Create message to hash
message = f"v0:{timestamp}:".encode('utf-8') + request_body
# Calculate expected signature
expected_signature = hmac.new(
ZOOM_WEBHOOK_SECRET.encode('utf-8'),
message,
hashlib.sha256
).hexdigest()
expected_signature = f"v0={expected_signature}"
return hmac.compare_digest(expected_signature, signature)
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"service": "Deepfake Hunter Zoom Webhook Handler",
"active_meetings": len(active_meetings)
}
@app.post("/zoom/webhook")
async def handle_zoom_webhook(
request: Request,
x_zm_signature: Optional[str] = Header(None),
x_zm_request_timestamp: Optional[str] = Header(None)
):
"""
Handle Zoom webhook events
Processes meeting events and triggers deepfake analysis for video streams.
"""
try:
# Get raw body for signature verification
body = await request.body()
# Verify webhook signature (in production)
if x_zm_signature and x_zm_request_timestamp:
if not verify_zoom_webhook(body, x_zm_signature, x_zm_request_timestamp):
logger.warning("Invalid webhook signature")
raise HTTPException(status_code=401, detail="Invalid signature")
# Parse event
event_data = json.loads(body)
event_type = event_data.get("event")
payload = event_data.get("payload", {})
logger.info(f"Received Zoom event: {event_type}")
# Handle different event types
if event_type == "endpoint.url_validation":
# Respond to URL validation
return handle_url_validation(payload)
elif event_type == "meeting.started":
return await handle_meeting_started(payload)
elif event_type == "meeting.participant_joined":
return await handle_participant_joined(payload)
elif event_type == "meeting.ended":
return await handle_meeting_ended(payload)
else:
logger.info(f"Unhandled event type: {event_type}")
return {"status": "ok"}
except Exception as e:
logger.error(f"Webhook handler error: {e}")
raise HTTPException(status_code=500, detail=str(e))
def handle_url_validation(payload: Dict[str, Any]) -> Dict[str, str]:
"""
Handle Zoom URL validation challenge
Args:
payload: Event payload with challenge
Returns:
Response with encrypted token
"""
plain_token = payload.get("plainToken")
if not plain_token:
raise HTTPException(status_code=400, detail="Missing plainToken")
# Encrypt token with webhook secret
encrypted_token = hmac.new(
ZOOM_WEBHOOK_SECRET.encode('utf-8'),
plain_token.encode('utf-8'),
hashlib.sha256
).hexdigest()
return {
"plainToken": plain_token,
"encryptedToken": encrypted_token
}
async def handle_meeting_started(payload: Dict[str, Any]) -> Dict[str, str]:
"""
Handle meeting started event
Initializes deepfake detection for the meeting.
"""
meeting_id = payload.get("object", {}).get("id")
meeting_topic = payload.get("object", {}).get("topic", "Unknown")
logger.info(f"Meeting started: {meeting_id} - {meeting_topic}")
# Initialize meeting session
active_meetings[meeting_id] = {
"topic": meeting_topic,
"started_at": datetime.now().isoformat(),
"participants": {},
"alerts": []
}
return {"status": "meeting_monitoring_started"}
async def handle_participant_joined(payload: Dict[str, Any]) -> Dict[str, str]:
"""
Handle participant joined event
Starts deepfake analysis for the participant's video stream.
"""
meeting_id = payload.get("object", {}).get("id")
participant = payload.get("object", {}).get("participant", {})
participant_id = participant.get("id")
participant_name = participant.get("user_name", "Unknown")
logger.info(f"Participant joined: {participant_name} in meeting {meeting_id}")
if meeting_id not in active_meetings:
active_meetings[meeting_id] = {
"participants": {},
"alerts": []
}
# Initialize participant tracking
active_meetings[meeting_id]["participants"][participant_id] = {
"name": participant_name,
"joined_at": datetime.now().isoformat(),
"deepfake_scores": [],
"is_suspicious": False,
"analysis_count": 0
}
# Note: Actual video stream analysis would be triggered here
# For now, this is a placeholder
# In production, you would:
# 1. Subscribe to participant's video stream
# 2. Periodically capture frames
# 3. Send frames to deepfake detection API
# 4. Update participant's deepfake_scores
# 5. Alert host if scores consistently high
logger.info(f"Started monitoring participant: {participant_name}")
return {"status": "participant_monitoring_started"}
async def handle_meeting_ended(payload: Dict[str, Any]) -> Dict[str, str]:
"""
Handle meeting ended event
Cleanup and generate final report.
"""
meeting_id = payload.get("object", {}).get("id")
logger.info(f"Meeting ended: {meeting_id}")
if meeting_id in active_meetings:
meeting_data = active_meetings[meeting_id]
# Generate summary
total_participants = len(meeting_data["participants"])
suspicious_count = sum(
1 for p in meeting_data["participants"].values()
if p.get("is_suspicious", False)
)
logger.info(
f"Meeting summary - Total participants: {total_participants}, "
f"Suspicious: {suspicious_count}"
)
# Cleanup
del active_meetings[meeting_id]
return {"status": "meeting_monitoring_ended"}
@app.post("/zoom/analyze_participant")
async def analyze_participant(
meeting_id: str,
participant_id: str,
frame_data: str
):
"""
Analyze participant video frame
Args:
meeting_id: Zoom meeting ID
participant_id: Participant ID
frame_data: Base64-encoded frame data
Returns:
Analysis results
"""
# This endpoint would be called periodically to analyze participant frames
# Implementation would:
# 1. Decode frame data
# 2. Send to deepfake detection API
# 3. Update participant scores
# 4. Generate alerts if needed
logger.info(f"Analyzing participant {participant_id} in meeting {meeting_id}")
# Placeholder response
return {
"meeting_id": meeting_id,
"participant_id": participant_id,
"is_deepfake": False,
"confidence": 0.0,
"timestamp": datetime.now().isoformat()
}
@app.get("/zoom/meetings")
async def get_active_meetings():
"""Get list of active meetings being monitored"""
return {
"active_meetings": len(active_meetings),
"meetings": {
meeting_id: {
"topic": data.get("topic", "Unknown"),
"participants": len(data.get("participants", {})),
"alerts": len(data.get("alerts", []))
}
for meeting_id, data in active_meetings.items()
}
}
@app.get("/zoom/meeting/{meeting_id}")
async def get_meeting_details(meeting_id: str):
"""Get details for specific meeting"""
if meeting_id not in active_meetings:
raise HTTPException(status_code=404, detail="Meeting not found")
return active_meetings[meeting_id]
if __name__ == "__main__":
logger.info("Starting Deepfake Hunter Zoom Webhook Handler...")
uvicorn.run(
app,
host="0.0.0.0",
port=8002,
log_level="info"
)