slack-agi / slack /workflows.py
ankush-003's picture
using async processing
bf1e5bd
import os
import logging
import requests
import json
import aiohttp
SLACK_WORKFLOW_URL = os.getenv("SLACK_WORKFLOW_URL", "")
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
async def send_to_workflow(data: dict) -> bool:
if not SLACK_WORKFLOW_URL:
logger.info("No workflow URL configured")
return False
try:
payload = json.dumps(data)
headers = {
'Content-Type': 'application/json'
}
logger.info(f"Sending to workflow: {payload}")
async with aiohttp.ClientSession() as session:
async with session.post(
SLACK_WORKFLOW_URL,
headers=headers,
data=payload,
timeout=aiohttp.ClientTimeout(total=10)
) as response:
response_text = await response.text()
logger.info(f"Workflow response status: {response.status}")
logger.info(f"Workflow response body: {response_text}")
response.raise_for_status()
logger.info("Sent to workflow successfully")
return True
except aiohttp.ClientError as e:
logger.error(f"Workflow HTTP error: {e}")
return False
except Exception as e:
logger.error(f"Workflow error: {e}")
return False