NeerajCodz commited on
Commit
61a5bea
Β·
verified Β·
1 Parent(s): 7d36f40
Files changed (1) hide show
  1. app.py +250 -138
app.py CHANGED
@@ -4,18 +4,21 @@ import re
4
  import logging
5
  import asyncio
6
  from typing import List, Dict, Any
 
7
 
8
  # Core Web Framework and Async Handlers
9
  from fastapi import FastAPI, Request
10
- from fastapi.responses import HTMLResponse
11
  from uvicorn import run as uvicorn_run
12
 
13
  # Slack Libraries (Async versions)
14
  from slack_bolt.async_app import AsyncApp
15
  from slack_bolt.adapter.fastapi.async_handler import AsyncSlackRequestHandler
16
- from slack_sdk.web.async_client import AsyncWebClient
17
  from slack_sdk.oauth.installation_store.async_installation_store import AsyncInstallationStore
18
- from slack_sdk.oauth.installation_store.models import Installation
 
 
19
  from slack_bolt.authorization import AuthorizeResult
20
 
21
  # RAG/ML Libraries
@@ -43,25 +46,21 @@ SLACK_CLIENT_ID = os.environ.get("SLACK_CLIENT_ID")
43
  SLACK_CLIENT_SECRET = os.environ.get("SLACK_CLIENT_SECRET")
44
  HF_TOKEN = os.environ.get("HF_TOKEN")
45
 
46
- # Validate bot token format
47
- if SLACK_BOT_TOKEN:
48
- if not SLACK_BOT_TOKEN.startswith("xoxb-"):
49
- raise ValueError(
50
- "❌ SLACK_BOT_TOKEN must start with 'xoxb-' (Bot User OAuth Token).\n"
51
- "You're using the wrong token type. Go to:\n"
52
- "https://api.slack.com/apps β†’ Your App β†’ OAuth & Permissions\n"
53
- "Copy the 'Bot User OAuth Token' (starts with xoxb-), NOT the User OAuth Token (xoxp-)"
54
- )
55
- logger.info("βœ“ Valid bot token format detected (xoxb-)")
56
- else:
57
- raise ValueError("SLACK_BOT_TOKEN is required")
58
-
59
  # Check for required environment variables
60
  required_vars = [SUPABASE_URL, SUPABASE_KEY, SLACK_CLIENT_ID, SLACK_CLIENT_SECRET, SLACK_SIGNING_SECRET]
61
  if not all(required_vars):
62
  missing = [var for var in required_vars if not var]
63
  raise ValueError(f"Missing required environment variables: {', '.join(missing)}")
64
 
 
 
 
 
 
 
 
 
 
65
  # Set HF_TOKEN if provided
66
  if HF_TOKEN:
67
  try:
@@ -73,54 +72,150 @@ if HF_TOKEN:
73
  # Initialize Supabase client
74
  supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
75
 
76
- # --- Supabase Async Installation Store ---
77
  class SupabaseAsyncInstallationStore(AsyncInstallationStore):
78
  def __init__(self, supabase_client):
79
  self.supabase = supabase_client
 
 
 
 
 
80
 
81
  async def save(self, installation: Installation) -> None:
 
82
  data = {
83
  "team_id": installation.team_id,
 
84
  "bot_token": installation.bot_token,
85
  "bot_user_id": installation.bot_user_id,
86
  "bot_scopes": ",".join(installation.bot_scopes) if installation.bot_scopes else None,
87
  "app_id": installation.app_id,
 
 
 
 
88
  }
89
  try:
90
  result = await asyncio.to_thread(
91
  lambda: self.supabase.table("installations").upsert(data, on_conflict="team_id").execute()
92
  )
93
- logger.info(f"Saved installation for team: {installation.team_id}")
 
 
 
94
  except Exception as e:
95
  logger.error(f"Failed to save installation for team {installation.team_id}: {e}")
96
  raise
97
 
98
- async def fetch_installation(self, team_id: str, *, enterprise_id: str | None = None, user_id: str | None = None) -> Installation | None:
99
- logger.info(f"Fetching installation for team_id: {team_id}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
100
  try:
101
  result = await asyncio.to_thread(
102
  lambda: self.supabase.table("installations").select("*").eq("team_id", team_id).execute()
103
  )
104
  if not result.data:
105
- logger.warning(f"No installation found for team {team_id}")
106
  return None
107
 
108
  data = result.data[0]
109
  return Installation(
110
  app_id=data.get("app_id"),
111
- enterprise_id=enterprise_id,
112
  team_id=team_id,
113
- user_id=user_id,
114
  bot_token=data.get("bot_token"),
115
  bot_user_id=data.get("bot_user_id"),
116
  bot_scopes=data.get("bot_scopes", "").split(',') if data.get("bot_scopes") else [],
117
- incoming_webhook=None,
 
118
  )
119
  except Exception as e:
120
  logger.error(f"Failed to fetch installation for team {team_id}: {e}")
121
  return None
122
 
123
- async def delete(self, team_id: str, *, enterprise_id: str | None = None, user_id: str | None = None) -> None:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
124
  try:
125
  await asyncio.to_thread(
126
  lambda: self.supabase.table("installations").delete().eq("team_id", team_id).execute()
@@ -132,29 +227,49 @@ class SupabaseAsyncInstallationStore(AsyncInstallationStore):
132
 
133
  # Initialize installation store
134
  installation_store = SupabaseAsyncInstallationStore(supabase)
135
- logger.info("Using single-team mode with SLACK_BOT_TOKEN.")
136
 
137
- # Custom authorize function
138
  async def async_authorize(enterprise_id, team_id, user_id):
139
- """
140
- Custom authorization function for single-team mode.
141
- Returns AuthorizeResult using the SLACK_BOT_TOKEN.
142
- """
143
- logger.info(f"Authorizing request for team: {team_id}, user: {user_id}")
144
 
145
- # For single-team, always use the configured bot token
146
- return AuthorizeResult(
147
  enterprise_id=enterprise_id,
148
  team_id=team_id,
149
- bot_user_id=None, # Will be populated by Slack SDK
150
- bot_token=SLACK_BOT_TOKEN,
151
- user_id=user_id,
152
  )
 
 
 
 
 
 
 
 
 
 
 
 
153
 
154
- # Initialize Bolt Async App (simplified for single-team)
155
  app = AsyncApp(
156
  signing_secret=SLACK_SIGNING_SECRET,
157
- token=SLACK_BOT_TOKEN, # Use token directly for single-team mode
 
 
 
 
 
 
 
 
 
 
 
 
 
 
158
  process_before_response=True,
159
  )
160
 
@@ -171,26 +286,30 @@ qa_pipeline = None
171
  try:
172
  embedding_model = SentenceTransformer('all-MiniLM-L6-v2', device=device)
173
  print("Loading QA model...")
174
- qa_pipeline = pipeline("question-answering", model="deepset/roberta-base-squad2", device=0 if device == 'cuda' else -1)
 
 
 
 
175
  print("Models loaded successfully!")
176
  except Exception as e:
177
  logger.error(f"Error loading models: {e}")
178
- raise RuntimeError("Failed to load required ML models. Check dependencies and hardware.")
179
 
180
  # --- Utility Functions ---
181
  def download_slack_file(url: str, token: str) -> bytes:
182
- """Downloads a file from Slack using the private download URL and bot token."""
183
  try:
184
  headers = {"Authorization": f"Bearer {token}"}
185
  response = requests.get(url, headers=headers, stream=True, timeout=30)
186
  response.raise_for_status()
187
  return response.content
188
  except requests.RequestException as e:
189
- logger.error(f"Failed to download Slack file from {url}: {e}")
190
  raise
191
 
192
  def extract_text_from_pdf(file_content: bytes) -> str:
193
- """Extracts text from PDF bytes."""
194
  try:
195
  pdf_reader = pypdf.PdfReader(io.BytesIO(file_content))
196
  text = ""
@@ -200,43 +319,34 @@ def extract_text_from_pdf(file_content: bytes) -> str:
200
  text += extracted + "\n"
201
  return text
202
  except Exception as e:
203
- logger.error(f"Failed to extract text from PDF: {e}")
204
  raise
205
 
206
  def extract_text_from_docx(file_content: bytes) -> str:
207
- """Extracts text from DOCX bytes."""
208
  try:
209
  doc = Document(io.BytesIO(file_content))
210
- text = ""
211
- for paragraph in doc.paragraphs:
212
- text += paragraph.text + "\n"
213
- return text
214
  except Exception as e:
215
- logger.error(f"Failed to extract text from DOCX: {e}")
216
  raise
217
 
218
  def chunk_text(text: str, chunk_size: int = 300) -> List[str]:
219
- """Chunks text by word count to create manageable RAG chunks."""
220
  words = text.split()
221
- chunks = []
222
- for i in range(0, len(words), chunk_size):
223
- chunk = " ".join(words[i:i + chunk_size])
224
- if chunk.strip():
225
- chunks.append(chunk)
226
- return chunks
227
 
228
  def embed_text(text: str) -> List[float]:
229
- """Generates an embedding for a piece of text."""
230
- if embedding_model is None:
231
  raise RuntimeError("Embedding model not loaded.")
232
- embedding = embedding_model.encode(text, convert_to_tensor=False)
233
- return embedding.tolist()
234
 
235
  async def store_embeddings(chunks: List[str]):
236
- """Stores text chunks and their embeddings in Supabase."""
237
  for chunk in chunks:
238
- embedding = embed_text(chunk)
239
  try:
 
240
  await asyncio.to_thread(
241
  lambda c=chunk, e=embedding: supabase.table("documents").insert({
242
  "content": c,
@@ -244,58 +354,56 @@ async def store_embeddings(chunks: List[str]):
244
  }).execute()
245
  )
246
  except Exception as e:
247
- logger.error(f"Failed to insert chunk into Supabase: {str(e)}")
248
 
249
  async def is_table_empty() -> bool:
250
- """Checks if the documents table has any records."""
251
  try:
252
  result = await asyncio.to_thread(
253
  lambda: supabase.table("documents").select("id", count="exact").limit(1).execute()
254
  )
255
  return result.count == 0
256
  except Exception as e:
257
- logger.error(f"Error checking table emptiness: {str(e)}")
258
  return True
259
 
260
  async def search_documents(query: str, match_count: int = 5) -> List[Dict[str, Any]]:
261
- """Searches Supabase for documents matching the query using vector similarity."""
262
- if embedding_model is None:
263
  raise RuntimeError("Embedding model not loaded.")
264
- query_embedding = embed_text(query)
265
  try:
 
266
  result = await asyncio.to_thread(
267
- lambda qe=query_embedding, mc=match_count: supabase.rpc("match_documents", {
268
- "query_embedding": qe,
269
- "match_count": mc
270
  }).execute()
271
  )
272
  return result.data
273
  except Exception as e:
274
- logger.error(f"Error searching documents for query '{query}': {str(e)}")
275
  return []
276
 
277
  async def answer_question(question: str, context: str) -> str:
278
- """Answers a question based on the provided context using the QA pipeline."""
279
- if qa_pipeline is None:
280
  raise RuntimeError("QA pipeline not loaded.")
281
  if not context.strip():
282
  return "No relevant documents found."
283
  try:
284
- MAX_CONTEXT_LEN = 4096
285
- context_slice = context[:MAX_CONTEXT_LEN]
286
-
287
  result = await asyncio.to_thread(
288
  lambda: qa_pipeline(question=question, context=context_slice)
289
  )
290
  return result['answer']
291
  except Exception as e:
292
- logger.error(f"Error in QA pipeline: {str(e)}")
293
- return "Error generating answer from context."
294
 
295
- # --- Slack Handlers (Async) ---
296
  @app.event("file_shared")
297
  async def handle_file_shared(event, say, client):
298
- """Processes files shared in a channel and adds their content to the RAG knowledge base."""
299
  file_id = event["file_id"]
300
  try:
301
  file_info = await client.files_info(file=file_id)
@@ -305,11 +413,10 @@ async def handle_file_shared(event, say, client):
305
  file_url = file_data.get("url_private_download")
306
 
307
  if not file_url:
308
- await say("No download URL available for the file.")
309
  return
310
 
311
- token = SLACK_BOT_TOKEN
312
- file_content = await asyncio.to_thread(download_slack_file, file_url, token)
313
 
314
  text = ""
315
  if "pdf" in file_type:
@@ -317,71 +424,80 @@ async def handle_file_shared(event, say, client):
317
  elif "wordprocessingml" in file_type or "msword" in file_type:
318
  text = await asyncio.to_thread(extract_text_from_docx, file_content)
319
  else:
320
- await say("Unsupported file type. Please upload PDF or DOCX files.")
321
  return
322
 
323
  if not text.strip():
324
- await say("No text could be extracted from the file.")
325
  return
326
 
327
  chunks = chunk_text(text)
328
  await store_embeddings(chunks)
329
 
330
- await say(f"βœ… File processed successfully! Added **{len(chunks)}** chunks to the knowledge base.")
331
  except Exception as e:
332
- logger.error(f"Error in file_shared handler for file_id {file_id}: {str(e)}")
333
  await say(f"❌ Error processing file: {str(e)}")
334
 
335
  @app.event("app_mention")
336
- async def handle_mention(event, say, client):
337
- """Handles mentions (@bot) for answering questions using RAG."""
338
  text = event["text"]
339
  user_query = re.sub(r'<@[A-Z0-9]+>', '', text).strip()
340
 
341
  if not user_query:
342
- await say("Please ask me a question after mentioning me!")
343
  return
344
 
345
  try:
346
  if await is_table_empty():
347
- await say("My knowledge base is empty. Please share some PDF or DOCX files first so I can learn!")
348
  return
349
 
350
  results = await search_documents(user_query, match_count=5)
351
 
352
  if not results:
353
- await say("I couldn't find any relevant information in my knowledge base. Try uploading more documents.")
354
  return
355
 
356
  context = " ".join([doc["content"] for doc in results])
357
  answer = await answer_question(user_query, context)
358
 
359
- await say(f"πŸ’‘ *Answer:* {answer}\n\n_I found this information by analyzing {len(results)} relevant document chunks._")
360
  except Exception as e:
361
- logger.error(f"Error in app_mention handler for query '{user_query}': {str(e)}")
362
- await say(f"❌ Error answering question: {str(e)}")
363
 
364
- # --- FastAPI Integration ---
365
  handler = AsyncSlackRequestHandler(app)
366
 
367
  @api.post("/slack/events")
368
  async def slack_events(request: Request):
369
- """Endpoint for all Slack event subscriptions."""
370
- try:
371
- logger.info("Received Slack event request")
372
- return await handler.handle(request)
373
- except Exception as e:
374
- logger.error(f"Error handling Slack events: {e}")
375
- raise
 
 
 
 
 
376
 
377
  @api.get("/")
378
  async def root():
379
- """Simple root endpoint."""
380
- return {"status": "Slack RAG Bot is running!", "message": "Use /slack/events endpoint for Slack events"}
 
 
 
 
381
 
382
  @api.get("/health")
383
  async def health():
384
- """Health check endpoint."""
385
  db_status = "error"
386
  try:
387
  await asyncio.to_thread(
@@ -389,41 +505,37 @@ async def health():
389
  )
390
  db_status = "ok"
391
  except Exception as e:
392
- logger.error(f"Database health check failed: {e}")
393
-
394
- models_status = "ok" if embedding_model and qa_pipeline else "error"
395
-
396
- # Check token validity
397
- token_status = "invalid"
398
- if SLACK_BOT_TOKEN and SLACK_BOT_TOKEN.startswith("xoxb-"):
399
- token_status = "valid_format"
400
 
401
  return {
402
- "status": "ok" if db_status == "ok" and models_status == "ok" else "error",
403
  "database": db_status,
404
- "models": models_status,
405
- "token": token_status,
406
- "mode": "single-team"
407
  }
408
 
409
- @api.get("/debug/token-check")
410
- async def debug_token():
411
- """Debug endpoint to verify token configuration."""
412
- token_info = {
413
- "token_set": bool(SLACK_BOT_TOKEN),
414
- "token_prefix": SLACK_BOT_TOKEN[:10] + "..." if SLACK_BOT_TOKEN else "NOT SET",
415
- "is_bot_token": SLACK_BOT_TOKEN.startswith("xoxb-") if SLACK_BOT_TOKEN else False,
416
- "is_user_token": SLACK_BOT_TOKEN.startswith("xoxp-") if SLACK_BOT_TOKEN else False,
417
- }
418
-
419
- if token_info["is_user_token"]:
420
- token_info["error"] = "❌ You're using a USER token (xoxp-). You need a BOT token (xoxb-)"
421
- token_info["fix"] = "Go to https://api.slack.com/apps β†’ Your App β†’ OAuth & Permissions β†’ Copy 'Bot User OAuth Token'"
422
-
423
- return token_info
 
 
 
424
 
425
  if __name__ == "__main__":
426
  port = int(os.environ.get("PORT", 7860))
427
- logger.info(f"Starting server on port {port}...")
428
- logger.info(f"Bot token validation: {'βœ“ Valid' if SLACK_BOT_TOKEN.startswith('xoxb-') else 'βœ— Invalid'}")
429
  uvicorn_run(api, host="0.0.0.0", port=port)
 
4
  import logging
5
  import asyncio
6
  from typing import List, Dict, Any
7
+ from datetime import datetime, timezone
8
 
9
  # Core Web Framework and Async Handlers
10
  from fastapi import FastAPI, Request
11
+ from fastapi.responses import HTMLResponse, RedirectResponse
12
  from uvicorn import run as uvicorn_run
13
 
14
  # Slack Libraries (Async versions)
15
  from slack_bolt.async_app import AsyncApp
16
  from slack_bolt.adapter.fastapi.async_handler import AsyncSlackRequestHandler
17
+ from slack_bolt.oauth.async_oauth_settings import AsyncOAuthSettings
18
  from slack_sdk.oauth.installation_store.async_installation_store import AsyncInstallationStore
19
+ from slack_sdk.oauth.installation_store.models import Installation, Bot
20
+ from slack_sdk.oauth.state_store import FileOAuthStateStore
21
+ from slack_sdk.web.async_client import AsyncWebClient
22
  from slack_bolt.authorization import AuthorizeResult
23
 
24
  # RAG/ML Libraries
 
46
  SLACK_CLIENT_SECRET = os.environ.get("SLACK_CLIENT_SECRET")
47
  HF_TOKEN = os.environ.get("HF_TOKEN")
48
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49
  # Check for required environment variables
50
  required_vars = [SUPABASE_URL, SUPABASE_KEY, SLACK_CLIENT_ID, SLACK_CLIENT_SECRET, SLACK_SIGNING_SECRET]
51
  if not all(required_vars):
52
  missing = [var for var in required_vars if not var]
53
  raise ValueError(f"Missing required environment variables: {', '.join(missing)}")
54
 
55
+ # Validate bot token if provided
56
+ if SLACK_BOT_TOKEN:
57
+ if SLACK_BOT_TOKEN.startswith("xoxp-"):
58
+ raise ValueError(
59
+ "❌ SLACK_BOT_TOKEN is a User OAuth Token (xoxp-).\n"
60
+ "You need the Bot User OAuth Token (xoxb-)"
61
+ )
62
+ logger.info("βœ“ Bot token provided")
63
+
64
  # Set HF_TOKEN if provided
65
  if HF_TOKEN:
66
  try:
 
72
  # Initialize Supabase client
73
  supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
74
 
75
+ # --- Supabase Installation Store with Token Rotation Support ---
76
  class SupabaseAsyncInstallationStore(AsyncInstallationStore):
77
  def __init__(self, supabase_client):
78
  self.supabase = supabase_client
79
+ self._client = AsyncWebClient()
80
+
81
+ async def async_save(self, installation: Installation):
82
+ """Save installation with token rotation support."""
83
+ return await self.save(installation)
84
 
85
  async def save(self, installation: Installation) -> None:
86
+ """Save installation including refresh token and expiration."""
87
  data = {
88
  "team_id": installation.team_id,
89
+ "enterprise_id": installation.enterprise_id,
90
  "bot_token": installation.bot_token,
91
  "bot_user_id": installation.bot_user_id,
92
  "bot_scopes": ",".join(installation.bot_scopes) if installation.bot_scopes else None,
93
  "app_id": installation.app_id,
94
+ "bot_refresh_token": installation.bot_refresh_token,
95
+ "bot_token_expires_at": installation.bot_token_expires_at,
96
+ "user_id": installation.user_id,
97
+ "installed_at": datetime.now(timezone.utc).isoformat(),
98
  }
99
  try:
100
  result = await asyncio.to_thread(
101
  lambda: self.supabase.table("installations").upsert(data, on_conflict="team_id").execute()
102
  )
103
+ logger.info(f"βœ“ Saved installation for team: {installation.team_id}")
104
+ if installation.bot_token_expires_at:
105
+ expires_dt = datetime.fromtimestamp(installation.bot_token_expires_at, tz=timezone.utc)
106
+ logger.info(f" Token expires at: {expires_dt}")
107
  except Exception as e:
108
  logger.error(f"Failed to save installation for team {installation.team_id}: {e}")
109
  raise
110
 
111
+ async def async_find_installation(
112
+ self,
113
+ *,
114
+ enterprise_id: str | None,
115
+ team_id: str | None,
116
+ user_id: str | None = None,
117
+ is_enterprise_install: bool | None = None,
118
+ ) -> Installation | None:
119
+ """Find installation with automatic token rotation."""
120
+ if not team_id:
121
+ return None
122
+
123
+ installation = await self.fetch_installation(
124
+ team_id=team_id,
125
+ enterprise_id=enterprise_id,
126
+ user_id=user_id
127
+ )
128
+
129
+ if not installation:
130
+ return None
131
+
132
+ # Check if token needs rotation
133
+ if installation.bot_token_expires_at and installation.bot_refresh_token:
134
+ now = datetime.now(timezone.utc).timestamp()
135
+ # Refresh if token expires in less than 1 hour
136
+ if installation.bot_token_expires_at - now < 3600:
137
+ logger.info(f"πŸ”„ Token expiring soon for team {team_id}, rotating...")
138
+ installation = await self._rotate_token(installation)
139
+
140
+ return installation
141
+
142
+ async def _rotate_token(self, installation: Installation) -> Installation:
143
+ """Rotate an expired or expiring token using refresh token."""
144
+ try:
145
+ response = await self._client.oauth_v2_access(
146
+ client_id=SLACK_CLIENT_ID,
147
+ client_secret=SLACK_CLIENT_SECRET,
148
+ grant_type="refresh_token",
149
+ refresh_token=installation.bot_refresh_token,
150
+ )
151
+
152
+ if response["ok"]:
153
+ # Update installation with new tokens
154
+ installation.bot_token = response["access_token"]
155
+ installation.bot_refresh_token = response.get("refresh_token", installation.bot_refresh_token)
156
+ installation.bot_token_expires_at = response.get("expires_in", 0) + int(datetime.now(timezone.utc).timestamp())
157
+
158
+ # Save updated installation
159
+ await self.save(installation)
160
+ logger.info(f"βœ“ Token rotated successfully for team {installation.team_id}")
161
+ return installation
162
+ else:
163
+ logger.error(f"Token rotation failed: {response.get('error')}")
164
+ return installation
165
+ except Exception as e:
166
+ logger.error(f"Error rotating token: {e}")
167
+ return installation
168
+
169
+ async def fetch_installation(
170
+ self,
171
+ team_id: str,
172
+ *,
173
+ enterprise_id: str | None = None,
174
+ user_id: str | None = None
175
+ ) -> Installation | None:
176
+ """Fetch installation from Supabase."""
177
  try:
178
  result = await asyncio.to_thread(
179
  lambda: self.supabase.table("installations").select("*").eq("team_id", team_id).execute()
180
  )
181
  if not result.data:
 
182
  return None
183
 
184
  data = result.data[0]
185
  return Installation(
186
  app_id=data.get("app_id"),
187
+ enterprise_id=data.get("enterprise_id"),
188
  team_id=team_id,
189
+ user_id=data.get("user_id"),
190
  bot_token=data.get("bot_token"),
191
  bot_user_id=data.get("bot_user_id"),
192
  bot_scopes=data.get("bot_scopes", "").split(',') if data.get("bot_scopes") else [],
193
+ bot_refresh_token=data.get("bot_refresh_token"),
194
+ bot_token_expires_at=data.get("bot_token_expires_at"),
195
  )
196
  except Exception as e:
197
  logger.error(f"Failed to fetch installation for team {team_id}: {e}")
198
  return None
199
 
200
+ async def async_delete_installation(
201
+ self,
202
+ *,
203
+ enterprise_id: str | None,
204
+ team_id: str | None,
205
+ user_id: str | None = None,
206
+ ) -> None:
207
+ """Delete installation."""
208
+ if team_id:
209
+ await self.delete(team_id=team_id, enterprise_id=enterprise_id, user_id=user_id)
210
+
211
+ async def delete(
212
+ self,
213
+ team_id: str,
214
+ *,
215
+ enterprise_id: str | None = None,
216
+ user_id: str | None = None
217
+ ) -> None:
218
+ """Delete installation from Supabase."""
219
  try:
220
  await asyncio.to_thread(
221
  lambda: self.supabase.table("installations").delete().eq("team_id", team_id).execute()
 
227
 
228
  # Initialize installation store
229
  installation_store = SupabaseAsyncInstallationStore(supabase)
 
230
 
231
+ # Custom authorize function with token rotation
232
  async def async_authorize(enterprise_id, team_id, user_id):
233
+ """Custom authorization with automatic token rotation."""
234
+ logger.info(f"Authorizing request for team: {team_id}")
 
 
 
235
 
236
+ # Try to find installation (will auto-rotate if needed)
237
+ installation = await installation_store.async_find_installation(
238
  enterprise_id=enterprise_id,
239
  team_id=team_id,
240
+ user_id=user_id
 
 
241
  )
242
+
243
+ if installation:
244
+ return AuthorizeResult(
245
+ enterprise_id=enterprise_id or installation.enterprise_id,
246
+ team_id=team_id,
247
+ user_id=user_id or installation.user_id,
248
+ bot_token=installation.bot_token,
249
+ bot_user_id=installation.bot_user_id,
250
+ )
251
+
252
+ logger.warning(f"No installation found for team {team_id}")
253
+ return None
254
 
255
+ # Initialize Bolt Async App with OAuth
256
  app = AsyncApp(
257
  signing_secret=SLACK_SIGNING_SECRET,
258
+ installation_store=installation_store,
259
+ authorize=async_authorize,
260
+ oauth_settings=AsyncOAuthSettings(
261
+ client_id=SLACK_CLIENT_ID,
262
+ client_secret=SLACK_CLIENT_SECRET,
263
+ scopes=[
264
+ "app_mentions:read",
265
+ "files:read",
266
+ "chat:write",
267
+ "channels:read",
268
+ ],
269
+ install_path="/slack/install",
270
+ redirect_uri_path="/slack/oauth_redirect",
271
+ state_store=FileOAuthStateStore(expiration_seconds=600, base_dir="/tmp/slack_states"),
272
+ ),
273
  process_before_response=True,
274
  )
275
 
 
286
  try:
287
  embedding_model = SentenceTransformer('all-MiniLM-L6-v2', device=device)
288
  print("Loading QA model...")
289
+ qa_pipeline = pipeline(
290
+ "question-answering",
291
+ model="deepset/roberta-base-squad2",
292
+ device=0 if device == 'cuda' else -1
293
+ )
294
  print("Models loaded successfully!")
295
  except Exception as e:
296
  logger.error(f"Error loading models: {e}")
297
+ raise RuntimeError("Failed to load required ML models.")
298
 
299
  # --- Utility Functions ---
300
  def download_slack_file(url: str, token: str) -> bytes:
301
+ """Downloads a file from Slack."""
302
  try:
303
  headers = {"Authorization": f"Bearer {token}"}
304
  response = requests.get(url, headers=headers, stream=True, timeout=30)
305
  response.raise_for_status()
306
  return response.content
307
  except requests.RequestException as e:
308
+ logger.error(f"Failed to download Slack file: {e}")
309
  raise
310
 
311
  def extract_text_from_pdf(file_content: bytes) -> str:
312
+ """Extracts text from PDF."""
313
  try:
314
  pdf_reader = pypdf.PdfReader(io.BytesIO(file_content))
315
  text = ""
 
319
  text += extracted + "\n"
320
  return text
321
  except Exception as e:
322
+ logger.error(f"Failed to extract PDF text: {e}")
323
  raise
324
 
325
  def extract_text_from_docx(file_content: bytes) -> str:
326
+ """Extracts text from DOCX."""
327
  try:
328
  doc = Document(io.BytesIO(file_content))
329
+ return "\n".join([p.text for p in doc.paragraphs])
 
 
 
330
  except Exception as e:
331
+ logger.error(f"Failed to extract DOCX text: {e}")
332
  raise
333
 
334
  def chunk_text(text: str, chunk_size: int = 300) -> List[str]:
335
+ """Chunks text by word count."""
336
  words = text.split()
337
+ return [" ".join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size) if words[i:i + chunk_size]]
 
 
 
 
 
338
 
339
  def embed_text(text: str) -> List[float]:
340
+ """Generates embedding for text."""
341
+ if not embedding_model:
342
  raise RuntimeError("Embedding model not loaded.")
343
+ return embedding_model.encode(text, convert_to_tensor=False).tolist()
 
344
 
345
  async def store_embeddings(chunks: List[str]):
346
+ """Stores text chunks and embeddings in Supabase."""
347
  for chunk in chunks:
 
348
  try:
349
+ embedding = embed_text(chunk)
350
  await asyncio.to_thread(
351
  lambda c=chunk, e=embedding: supabase.table("documents").insert({
352
  "content": c,
 
354
  }).execute()
355
  )
356
  except Exception as e:
357
+ logger.error(f"Failed to insert chunk: {e}")
358
 
359
  async def is_table_empty() -> bool:
360
+ """Checks if documents table is empty."""
361
  try:
362
  result = await asyncio.to_thread(
363
  lambda: supabase.table("documents").select("id", count="exact").limit(1).execute()
364
  )
365
  return result.count == 0
366
  except Exception as e:
367
+ logger.error(f"Error checking table: {e}")
368
  return True
369
 
370
  async def search_documents(query: str, match_count: int = 5) -> List[Dict[str, Any]]:
371
+ """Searches documents using vector similarity."""
372
+ if not embedding_model:
373
  raise RuntimeError("Embedding model not loaded.")
 
374
  try:
375
+ query_embedding = embed_text(query)
376
  result = await asyncio.to_thread(
377
+ lambda: supabase.rpc("match_documents", {
378
+ "query_embedding": query_embedding,
379
+ "match_count": match_count
380
  }).execute()
381
  )
382
  return result.data
383
  except Exception as e:
384
+ logger.error(f"Error searching documents: {e}")
385
  return []
386
 
387
  async def answer_question(question: str, context: str) -> str:
388
+ """Answers question using QA pipeline."""
389
+ if not qa_pipeline:
390
  raise RuntimeError("QA pipeline not loaded.")
391
  if not context.strip():
392
  return "No relevant documents found."
393
  try:
394
+ context_slice = context[:4096]
 
 
395
  result = await asyncio.to_thread(
396
  lambda: qa_pipeline(question=question, context=context_slice)
397
  )
398
  return result['answer']
399
  except Exception as e:
400
+ logger.error(f"Error in QA pipeline: {e}")
401
+ return "Error generating answer."
402
 
403
+ # --- Slack Event Handlers ---
404
  @app.event("file_shared")
405
  async def handle_file_shared(event, say, client):
406
+ """Processes shared files."""
407
  file_id = event["file_id"]
408
  try:
409
  file_info = await client.files_info(file=file_id)
 
413
  file_url = file_data.get("url_private_download")
414
 
415
  if not file_url:
416
+ await say("No download URL available.")
417
  return
418
 
419
+ file_content = await asyncio.to_thread(download_slack_file, file_url, client.token)
 
420
 
421
  text = ""
422
  if "pdf" in file_type:
 
424
  elif "wordprocessingml" in file_type or "msword" in file_type:
425
  text = await asyncio.to_thread(extract_text_from_docx, file_content)
426
  else:
427
+ await say("❌ Unsupported file type. Please upload PDF or DOCX files.")
428
  return
429
 
430
  if not text.strip():
431
+ await say("❌ No text could be extracted from the file.")
432
  return
433
 
434
  chunks = chunk_text(text)
435
  await store_embeddings(chunks)
436
 
437
+ await say(f"βœ… File processed! Added **{len(chunks)}** chunks to knowledge base.")
438
  except Exception as e:
439
+ logger.error(f"Error processing file {file_id}: {e}")
440
  await say(f"❌ Error processing file: {str(e)}")
441
 
442
  @app.event("app_mention")
443
+ async def handle_mention(event, say):
444
+ """Handles bot mentions."""
445
  text = event["text"]
446
  user_query = re.sub(r'<@[A-Z0-9]+>', '', text).strip()
447
 
448
  if not user_query:
449
+ await say("Please ask me a question!")
450
  return
451
 
452
  try:
453
  if await is_table_empty():
454
+ await say("πŸ“š My knowledge base is empty. Please share PDF or DOCX files first!")
455
  return
456
 
457
  results = await search_documents(user_query, match_count=5)
458
 
459
  if not results:
460
+ await say("πŸ” I couldn't find relevant information. Try uploading more documents.")
461
  return
462
 
463
  context = " ".join([doc["content"] for doc in results])
464
  answer = await answer_question(user_query, context)
465
 
466
+ await say(f"πŸ’‘ **Answer:** {answer}\n\n_Found from {len(results)} document chunks._")
467
  except Exception as e:
468
+ logger.error(f"Error answering query '{user_query}': {e}")
469
+ await say(f"❌ Error: {str(e)}")
470
 
471
+ # --- FastAPI Routes ---
472
  handler = AsyncSlackRequestHandler(app)
473
 
474
  @api.post("/slack/events")
475
  async def slack_events(request: Request):
476
+ """Slack events endpoint."""
477
+ return await handler.handle(request)
478
+
479
+ @api.get("/slack/install")
480
+ async def slack_install(request: Request):
481
+ """OAuth installation initiation."""
482
+ return await handler.handle(request)
483
+
484
+ @api.get("/slack/oauth_redirect")
485
+ async def oauth_redirect(request: Request):
486
+ """OAuth callback handler."""
487
+ return await handler.handle(request)
488
 
489
  @api.get("/")
490
  async def root():
491
+ """Root endpoint."""
492
+ return {
493
+ "status": "βœ… Slack RAG Bot Running",
494
+ "install_url": f"/slack/install",
495
+ "features": ["OAuth2 with token rotation", "RAG with vector search", "PDF/DOCX support"]
496
+ }
497
 
498
  @api.get("/health")
499
  async def health():
500
+ """Health check."""
501
  db_status = "error"
502
  try:
503
  await asyncio.to_thread(
 
505
  )
506
  db_status = "ok"
507
  except Exception as e:
508
+ logger.error(f"DB health check failed: {e}")
 
 
 
 
 
 
 
509
 
510
  return {
511
+ "status": "ok" if db_status == "ok" and embedding_model and qa_pipeline else "error",
512
  "database": db_status,
513
+ "models": "ok" if embedding_model and qa_pipeline else "error",
514
+ "oauth": "enabled",
515
+ "token_rotation": "enabled"
516
  }
517
 
518
+ @api.get("/debug/installations")
519
+ async def debug_installations():
520
+ """Debug endpoint to view installations."""
521
+ try:
522
+ result = await asyncio.to_thread(
523
+ lambda: supabase.table("installations").select("team_id", "bot_user_id", "bot_token_expires_at", "installed_at").execute()
524
+ )
525
+ installations = []
526
+ for inst in result.data:
527
+ expires_at = inst.get("bot_token_expires_at")
528
+ if expires_at:
529
+ expires_dt = datetime.fromtimestamp(expires_at, tz=timezone.utc)
530
+ inst["expires_at_formatted"] = expires_dt.isoformat()
531
+ inst["expires_in_hours"] = round((expires_at - datetime.now(timezone.utc).timestamp()) / 3600, 2)
532
+ installations.append(inst)
533
+ return {"installations": installations, "count": len(installations)}
534
+ except Exception as e:
535
+ return {"error": str(e)}
536
 
537
  if __name__ == "__main__":
538
  port = int(os.environ.get("PORT", 7860))
539
+ logger.info(f"πŸš€ Starting server on port {port}")
540
+ logger.info("πŸ” OAuth2 with token rotation enabled")
541
  uvicorn_run(api, host="0.0.0.0", port=port)