ankush-003 commited on
Commit
bf1e5bd
·
1 Parent(s): 7a6bd0d

using async processing

Browse files
Files changed (3) hide show
  1. requirements.txt +1 -0
  2. routers/slack.py +55 -25
  3. slack/workflows.py +19 -10
requirements.txt CHANGED
@@ -1,3 +1,4 @@
 
1
  fastapi
2
  uvicorn[standard]
3
  httpx
 
1
+ aiohttp
2
  fastapi
3
  uvicorn[standard]
4
  httpx
routers/slack.py CHANGED
@@ -1,6 +1,8 @@
1
- from fastapi import APIRouter, Request, HTTPException
2
  import json
3
  import logging
 
 
4
  from crew.crew import SlackCrew
5
  from slack.utils import verify_slack_signature
6
  from slack.workflows import send_to_workflow
@@ -13,10 +15,46 @@ router = APIRouter(
13
  logging.basicConfig(level=logging.INFO)
14
  logger = logging.getLogger(__name__)
15
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
16
  @router.post("/slack")
17
- async def handle_slack(request: Request):
18
- """Single endpoint for all Slack requests"""
19
- crew = SlackCrew().crew()
20
  try:
21
  # Get request data
22
  timestamp = request.headers.get("X-Slack-Request-Timestamp", "")
@@ -64,29 +102,21 @@ async def handle_slack(request: Request):
64
  if payload.get("event", {}).get("subtype") == "bot_message":
65
  return {"status": "ignored"}
66
 
67
- # Process with AI if needed
68
- crew_response = ""
69
  if text:
70
- crew_response = crew.kickoff(inputs = {
71
- "question": text,
72
- })
73
- logger.info(f"AI processed: {text[:50]}...")
74
-
75
- # Prepare workflow data
76
- workflow_data = {
77
- "user": user,
78
- "query": text,
79
- "message": text
80
- }
81
-
82
- # Add crew response to message if available
83
- if crew_response:
84
- workflow_data["message"] = f"{crew_response}"
85
-
86
- # Send to workflow
87
- send_to_workflow(workflow_data)
88
 
89
- return {"status": "ok"}
 
90
 
91
  except HTTPException:
92
  raise
 
1
+ from fastapi import APIRouter, Request, HTTPException, BackgroundTasks
2
  import json
3
  import logging
4
+ import asyncio
5
+ from concurrent.futures import ThreadPoolExecutor
6
  from crew.crew import SlackCrew
7
  from slack.utils import verify_slack_signature
8
  from slack.workflows import send_to_workflow
 
15
  logging.basicConfig(level=logging.INFO)
16
  logger = logging.getLogger(__name__)
17
 
18
+ # Thread pool for CPU-bound tasks
19
+ executor = ThreadPoolExecutor(max_workers=4)
20
+
21
+ async def process_with_crew_async(text: str, user: str, channel: str):
22
+ """Process message with CrewAI in background"""
23
+ try:
24
+ crew = SlackCrew().crew()
25
+
26
+ # Run the crew kickoff in a thread pool since it's likely CPU-bound
27
+ loop = asyncio.get_event_loop()
28
+ crew_response = await loop.run_in_executor(
29
+ executor,
30
+ lambda: crew.kickoff(inputs={"question": text})
31
+ )
32
+
33
+ logger.info(f"AI processed: {text[:50]}... -> Response length: {len(str(crew_response))}")
34
+
35
+ # Send the AI response to workflow
36
+ workflow_data = {
37
+ "user": user,
38
+ "query": text,
39
+ "message": str(crew_response),
40
+ }
41
+
42
+ # Send to workflow asynchronously
43
+ await send_to_workflow(workflow_data)
44
+
45
+ except Exception as e:
46
+ logger.error(f"Background crew processing error: {e}")
47
+ # Optionally send error to workflow or handle gracefully
48
+ error_data = {
49
+ "user": user,
50
+ "query": text,
51
+ "message": f"Sorry, I encountered an error processing your request: {str(e)}",
52
+ }
53
+ await send_to_workflow(error_data)
54
+
55
  @router.post("/slack")
56
+ async def handle_slack(request: Request, background_tasks: BackgroundTasks):
57
+ """Single endpoint for all Slack requests - now with async processing"""
 
58
  try:
59
  # Get request data
60
  timestamp = request.headers.get("X-Slack-Request-Timestamp", "")
 
102
  if payload.get("event", {}).get("subtype") == "bot_message":
103
  return {"status": "ignored"}
104
 
105
+ # Process with AI in background if needed
 
106
  if text:
107
+ # Add the crew processing to background tasks
108
+ background_tasks.add_task(process_with_crew_async, text, user, channel)
109
+
110
+ # Optionally send immediate acknowledgment
111
+ ack_data = {
112
+ "user": user,
113
+ "query": text,
114
+ "message": "Processing your request... I'll get back to you shortly!",
115
+ }
116
+ await send_to_workflow(ack_data)
 
 
 
 
 
 
 
 
117
 
118
+ # Return immediately
119
+ return {"status": "ok", "message": "Processing in background"}
120
 
121
  except HTTPException:
122
  raise
slack/workflows.py CHANGED
@@ -2,14 +2,14 @@ import os
2
  import logging
3
  import requests
4
  import json
 
5
 
6
  SLACK_WORKFLOW_URL = os.getenv("SLACK_WORKFLOW_URL", "")
7
 
8
  logger = logging.getLogger(__name__)
9
  logging.basicConfig(level=logging.INFO)
10
 
11
- def send_to_workflow(data: dict) -> bool:
12
- """Send data to Slack workflow"""
13
  if not SLACK_WORKFLOW_URL:
14
  logger.info("No workflow URL configured")
15
  return False
@@ -21,14 +21,23 @@ def send_to_workflow(data: dict) -> bool:
21
  }
22
 
23
  logger.info(f"Sending to workflow: {payload}")
24
- response = requests.post(SLACK_WORKFLOW_URL, headers=headers, data=payload, timeout=10)
25
- logger.info(f"Workflow response status: {response.status_code}")
26
- logger.info(f"Workflow response body: {response.text}")
27
- response.raise_for_status()
28
- logger.info("Sent to workflow successfully")
29
- return True
30
- except requests.exceptions.HTTPError as e:
31
- logger.error(f"Workflow HTTP error {response.status_code}: {response.text}")
 
 
 
 
 
 
 
 
 
32
  return False
33
  except Exception as e:
34
  logger.error(f"Workflow error: {e}")
 
2
  import logging
3
  import requests
4
  import json
5
+ import aiohttp
6
 
7
  SLACK_WORKFLOW_URL = os.getenv("SLACK_WORKFLOW_URL", "")
8
 
9
  logger = logging.getLogger(__name__)
10
  logging.basicConfig(level=logging.INFO)
11
 
12
+ async def send_to_workflow(data: dict) -> bool:
 
13
  if not SLACK_WORKFLOW_URL:
14
  logger.info("No workflow URL configured")
15
  return False
 
21
  }
22
 
23
  logger.info(f"Sending to workflow: {payload}")
24
+
25
+ async with aiohttp.ClientSession() as session:
26
+ async with session.post(
27
+ SLACK_WORKFLOW_URL,
28
+ headers=headers,
29
+ data=payload,
30
+ timeout=aiohttp.ClientTimeout(total=10)
31
+ ) as response:
32
+ response_text = await response.text()
33
+ logger.info(f"Workflow response status: {response.status}")
34
+ logger.info(f"Workflow response body: {response_text}")
35
+ response.raise_for_status()
36
+ logger.info("Sent to workflow successfully")
37
+ return True
38
+
39
+ except aiohttp.ClientError as e:
40
+ logger.error(f"Workflow HTTP error: {e}")
41
  return False
42
  except Exception as e:
43
  logger.error(f"Workflow error: {e}")