""" Deepfake Hunter - Zoom Webhook Handler Handles Zoom meeting webhooks and processes video streams for deepfake detection. Author: Deepfake Hunter Team License: MIT """ import os import json import hmac import hashlib from typing import Dict, Any, Optional from datetime import datetime from fastapi import FastAPI, Request, HTTPException, Header from pydantic import BaseModel import uvicorn from loguru import logger # Initialize FastAPI app = FastAPI(title="Deepfake Hunter - Zoom Webhook Handler") # Configuration ZOOM_WEBHOOK_SECRET = os.getenv("ZOOM_WEBHOOK_SECRET", "your-zoom-webhook-secret") ZOOM_VERIFICATION_TOKEN = os.getenv("ZOOM_VERIFICATION_TOKEN", "your-zoom-verification-token") API_BASE_URL = os.getenv("DEEPFAKE_API_URL", "http://localhost:8001") API_KEY = os.getenv("DEEPFAKE_API_KEY", "your-secret-api-key") # Store active meeting sessions active_meetings = {} class ZoomWebhookEvent(BaseModel): """Zoom webhook event model""" event: str payload: Dict[str, Any] def verify_zoom_webhook( request_body: bytes, signature: str, timestamp: str ) -> bool: """ Verify Zoom webhook signature Args: request_body: Raw request body signature: X-Zm-Signature header timestamp: X-Zm-Request-Timestamp header Returns: True if signature is valid """ # Create message to hash message = f"v0:{timestamp}:".encode('utf-8') + request_body # Calculate expected signature expected_signature = hmac.new( ZOOM_WEBHOOK_SECRET.encode('utf-8'), message, hashlib.sha256 ).hexdigest() expected_signature = f"v0={expected_signature}" return hmac.compare_digest(expected_signature, signature) @app.get("/health") async def health_check(): """Health check endpoint""" return { "status": "healthy", "service": "Deepfake Hunter Zoom Webhook Handler", "active_meetings": len(active_meetings) } @app.post("/zoom/webhook") async def handle_zoom_webhook( request: Request, x_zm_signature: Optional[str] = Header(None), x_zm_request_timestamp: Optional[str] = Header(None) ): """ Handle Zoom webhook events Processes meeting events and triggers deepfake analysis for video streams. """ try: # Get raw body for signature verification body = await request.body() # Verify webhook signature (in production) if x_zm_signature and x_zm_request_timestamp: if not verify_zoom_webhook(body, x_zm_signature, x_zm_request_timestamp): logger.warning("Invalid webhook signature") raise HTTPException(status_code=401, detail="Invalid signature") # Parse event event_data = json.loads(body) event_type = event_data.get("event") payload = event_data.get("payload", {}) logger.info(f"Received Zoom event: {event_type}") # Handle different event types if event_type == "endpoint.url_validation": # Respond to URL validation return handle_url_validation(payload) elif event_type == "meeting.started": return await handle_meeting_started(payload) elif event_type == "meeting.participant_joined": return await handle_participant_joined(payload) elif event_type == "meeting.ended": return await handle_meeting_ended(payload) else: logger.info(f"Unhandled event type: {event_type}") return {"status": "ok"} except Exception as e: logger.error(f"Webhook handler error: {e}") raise HTTPException(status_code=500, detail=str(e)) def handle_url_validation(payload: Dict[str, Any]) -> Dict[str, str]: """ Handle Zoom URL validation challenge Args: payload: Event payload with challenge Returns: Response with encrypted token """ plain_token = payload.get("plainToken") if not plain_token: raise HTTPException(status_code=400, detail="Missing plainToken") # Encrypt token with webhook secret encrypted_token = hmac.new( ZOOM_WEBHOOK_SECRET.encode('utf-8'), plain_token.encode('utf-8'), hashlib.sha256 ).hexdigest() return { "plainToken": plain_token, "encryptedToken": encrypted_token } async def handle_meeting_started(payload: Dict[str, Any]) -> Dict[str, str]: """ Handle meeting started event Initializes deepfake detection for the meeting. """ meeting_id = payload.get("object", {}).get("id") meeting_topic = payload.get("object", {}).get("topic", "Unknown") logger.info(f"Meeting started: {meeting_id} - {meeting_topic}") # Initialize meeting session active_meetings[meeting_id] = { "topic": meeting_topic, "started_at": datetime.now().isoformat(), "participants": {}, "alerts": [] } return {"status": "meeting_monitoring_started"} async def handle_participant_joined(payload: Dict[str, Any]) -> Dict[str, str]: """ Handle participant joined event Starts deepfake analysis for the participant's video stream. """ meeting_id = payload.get("object", {}).get("id") participant = payload.get("object", {}).get("participant", {}) participant_id = participant.get("id") participant_name = participant.get("user_name", "Unknown") logger.info(f"Participant joined: {participant_name} in meeting {meeting_id}") if meeting_id not in active_meetings: active_meetings[meeting_id] = { "participants": {}, "alerts": [] } # Initialize participant tracking active_meetings[meeting_id]["participants"][participant_id] = { "name": participant_name, "joined_at": datetime.now().isoformat(), "deepfake_scores": [], "is_suspicious": False, "analysis_count": 0 } # Note: Actual video stream analysis would be triggered here # For now, this is a placeholder # In production, you would: # 1. Subscribe to participant's video stream # 2. Periodically capture frames # 3. Send frames to deepfake detection API # 4. Update participant's deepfake_scores # 5. Alert host if scores consistently high logger.info(f"Started monitoring participant: {participant_name}") return {"status": "participant_monitoring_started"} async def handle_meeting_ended(payload: Dict[str, Any]) -> Dict[str, str]: """ Handle meeting ended event Cleanup and generate final report. """ meeting_id = payload.get("object", {}).get("id") logger.info(f"Meeting ended: {meeting_id}") if meeting_id in active_meetings: meeting_data = active_meetings[meeting_id] # Generate summary total_participants = len(meeting_data["participants"]) suspicious_count = sum( 1 for p in meeting_data["participants"].values() if p.get("is_suspicious", False) ) logger.info( f"Meeting summary - Total participants: {total_participants}, " f"Suspicious: {suspicious_count}" ) # Cleanup del active_meetings[meeting_id] return {"status": "meeting_monitoring_ended"} @app.post("/zoom/analyze_participant") async def analyze_participant( meeting_id: str, participant_id: str, frame_data: str ): """ Analyze participant video frame Args: meeting_id: Zoom meeting ID participant_id: Participant ID frame_data: Base64-encoded frame data Returns: Analysis results """ # This endpoint would be called periodically to analyze participant frames # Implementation would: # 1. Decode frame data # 2. Send to deepfake detection API # 3. Update participant scores # 4. Generate alerts if needed logger.info(f"Analyzing participant {participant_id} in meeting {meeting_id}") # Placeholder response return { "meeting_id": meeting_id, "participant_id": participant_id, "is_deepfake": False, "confidence": 0.0, "timestamp": datetime.now().isoformat() } @app.get("/zoom/meetings") async def get_active_meetings(): """Get list of active meetings being monitored""" return { "active_meetings": len(active_meetings), "meetings": { meeting_id: { "topic": data.get("topic", "Unknown"), "participants": len(data.get("participants", {})), "alerts": len(data.get("alerts", [])) } for meeting_id, data in active_meetings.items() } } @app.get("/zoom/meeting/{meeting_id}") async def get_meeting_details(meeting_id: str): """Get details for specific meeting""" if meeting_id not in active_meetings: raise HTTPException(status_code=404, detail="Meeting not found") return active_meetings[meeting_id] if __name__ == "__main__": logger.info("Starting Deepfake Hunter Zoom Webhook Handler...") uvicorn.run( app, host="0.0.0.0", port=8002, log_level="info" )