Spaces:
Sleeping
Sleeping
| import os | |
| import logging | |
| import requests | |
| import json | |
| import aiohttp | |
| SLACK_WORKFLOW_URL = os.getenv("SLACK_WORKFLOW_URL", "") | |
| logger = logging.getLogger(__name__) | |
| logging.basicConfig(level=logging.INFO) | |
| async def send_to_workflow(data: dict) -> bool: | |
| if not SLACK_WORKFLOW_URL: | |
| logger.info("No workflow URL configured") | |
| return False | |
| try: | |
| payload = json.dumps(data) | |
| headers = { | |
| 'Content-Type': 'application/json' | |
| } | |
| logger.info(f"Sending to workflow: {payload}") | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post( | |
| SLACK_WORKFLOW_URL, | |
| headers=headers, | |
| data=payload, | |
| timeout=aiohttp.ClientTimeout(total=10) | |
| ) as response: | |
| response_text = await response.text() | |
| logger.info(f"Workflow response status: {response.status}") | |
| logger.info(f"Workflow response body: {response_text}") | |
| response.raise_for_status() | |
| logger.info("Sent to workflow successfully") | |
| return True | |
| except aiohttp.ClientError as e: | |
| logger.error(f"Workflow HTTP error: {e}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Workflow error: {e}") | |
| return False |