File size: 1,015 Bytes
f77f60f
 
644db45
 
f77f60f
 
 
644db45
f77f60f
644db45
 
f77f60f
 
644db45
f77f60f
644db45
f77f60f
 
644db45
f77f60f
644db45
 
 
 
 
 
f77f60f
 
644db45
f77f60f
 
644db45
f77f60f
644db45
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# app/qstash_client.py
import logging
from typing import Optional, Dict, Any
from app.deps import get_qstash_client  # ✅ Import from existing logic

logger = logging.getLogger(__name__)

def is_qstash_available() -> bool:
    """
    Check if QStash is available without raising errors.
    Uses the singleton from deps.py
    """
    try:
        get_qstash_client()
        return True
    except RuntimeError:
        return False

def publish_message(url: str, body: Dict[str, Any], callback: Optional[str] = None) -> Dict[str, Any]:
    """
    Publish a message to QStash using the singleton client from deps.
    
    Args:
        url: Endpoint URL to call
        body: JSON payload
        callback: Optional callback URL
    
    Returns:
        Dict with message_id
        
    Raises:
        RuntimeError: If QStash not initialized
    """
    client = get_qstash_client()
    result = client.message.publish(url=url, body=body, callback=callback)
    
    return {"message_id": result.message_id}