NeerajCodz commited on
Commit
91e7236
·
verified ·
1 Parent(s): da3bac8
Files changed (1) hide show
  1. app.py +32 -43
app.py CHANGED
@@ -11,7 +11,7 @@ from fastapi.responses import HTMLResponse
11
  from uvicorn import run as uvicorn_run
12
 
13
  # Slack Libraries (Async versions)
14
- from slack_bolt import AsyncApp
15
  from slack_bolt.adapter.fastapi import AsyncSlackRequestHandler
16
  from slack_sdk.oauth.installation_store.async_installation_store import AsyncInstallationStore
17
  from slack_sdk.oauth import AuthorizeUrlGenerator
@@ -67,7 +67,6 @@ class SupabaseAsyncInstallationStore(AsyncInstallationStore):
67
  self.supabase = supabase_client
68
 
69
  async def save(self, installation: Installation) -> None:
70
- loop = asyncio.get_event_loop()
71
  data = {
72
  "team_id": installation.team_id,
73
  "bot_token": installation.bot_token,
@@ -76,21 +75,19 @@ class SupabaseAsyncInstallationStore(AsyncInstallationStore):
76
  "app_id": installation.app_id,
77
  }
78
  try:
79
- await loop.run_in_executor(
80
- None,
81
- lambda: self.supabase.table("installations").upsert(data, on_conflict="team_id").execute()
82
  )
 
83
  logger.info(f"Saved installation for team: {installation.team_id}")
84
  except Exception as e:
85
  logger.error(f"Failed to save installation for team {installation.team_id}: {e}")
86
  raise
87
 
88
  async def fetch_installation(self, team_id: str, *, enterprise_id: str | None = None, user_id: str | None = None) -> Installation | None:
89
- loop = asyncio.get_event_loop()
90
  try:
91
- result = await loop.run_in_executor(
92
- None,
93
- lambda: self.supabase.table("installations").select("*").eq("team_id", team_id).execute()
94
  )
95
  if not result.data:
96
  return None
@@ -111,11 +108,9 @@ class SupabaseAsyncInstallationStore(AsyncInstallationStore):
111
  raise
112
 
113
  async def delete(self, team_id: str, *, enterprise_id: str | None = None, user_id: str | None = None) -> None:
114
- loop = asyncio.get_event_loop()
115
  try:
116
- await loop.run_in_executor(
117
- None,
118
- lambda: self.supabase.table("installations").delete().eq("team_id", team_id).execute()
119
  )
120
  logger.info(f"Deleted installation for team: {team_id}")
121
  except Exception as e:
@@ -209,44 +204,38 @@ def embed_text(text: str) -> List[float]:
209
  embedding = embedding_model.encode(text, convert_to_tensor=False)
210
  return embedding.tolist()
211
 
212
- def store_embeddings(chunks: List[str]):
213
  """Stores text chunks and their embeddings in Supabase."""
214
- loop = asyncio.get_event_loop()
215
  for chunk in chunks:
216
  embedding = embed_text(chunk)
217
  try:
218
- loop.run_in_executor(
219
- None,
220
- lambda c=chunk, e=embedding: supabase.table("documents").insert({
221
- "content": c,
222
- "embedding": e
223
- }).execute()
224
  )
225
  except Exception as e:
226
  logger.error(f"Failed to insert chunk into Supabase: {str(e)}")
227
 
228
- def is_table_empty() -> bool:
229
  """Checks if the documents table has any records."""
230
  try:
231
- loop = asyncio.get_event_loop()
232
- result = loop.run_in_executor(
233
- None,
234
- lambda: supabase.table("documents").select("id", count="exact").limit(1).execute()
235
  )
236
  return result.count == 0
237
  except Exception as e:
238
  logger.error(f"Error checking table emptiness: {str(e)}")
239
  return True
240
 
241
- def search_documents(query: str, match_count: int = 5) -> List[Dict[str, Any]]:
242
  """Searches Supabase for documents matching the query using vector similarity."""
243
  if embedding_model is None:
244
  raise RuntimeError("Embedding model not loaded.")
245
  query_embedding = embed_text(query)
246
  try:
247
- loop = asyncio.get_event_loop()
248
- result = loop.run_in_executor(
249
- None,
250
  lambda: supabase.rpc("match_documents", {
251
  "query_embedding": query_embedding,
252
  "match_count": match_count
@@ -257,7 +246,7 @@ def search_documents(query: str, match_count: int = 5) -> List[Dict[str, Any]]:
257
  logger.error(f"Error searching documents for query '{query}': {str(e)}")
258
  return []
259
 
260
- def answer_question(question: str, context: str) -> str:
261
  """Answers a question based on the provided context using the QA pipeline."""
262
  if qa_pipeline is None:
263
  raise RuntimeError("QA pipeline not loaded.")
@@ -267,7 +256,9 @@ def answer_question(question: str, context: str) -> str:
267
  MAX_CONTEXT_LEN = 4096
268
  context_slice = context[:MAX_CONTEXT_LEN]
269
 
270
- result = qa_pipeline(question=question, context=context_slice)
 
 
271
  return result['answer']
272
  except Exception as e:
273
  logger.error(f"Error in QA pipeline: {str(e)}")
@@ -291,13 +282,13 @@ async def handle_file_shared(event, say, client):
291
  return
292
 
293
  token = client.token
294
- file_content = download_slack_file(file_url, token)
295
 
296
  text = ""
297
  if "pdf" in file_type:
298
- text = extract_text_from_pdf(file_content)
299
  elif "wordprocessingml" in file_type or "msword" in file_type:
300
- text = extract_text_from_docx(file_content)
301
  else:
302
  await say("Unsupported file type. Please upload PDF or DOCX files.")
303
  return
@@ -307,7 +298,7 @@ async def handle_file_shared(event, say, client):
307
  return
308
 
309
  chunks = chunk_text(text)
310
- store_embeddings(chunks)
311
 
312
  await say(f"✅ File processed successfully! Added **{len(chunks)}** chunks to the knowledge base.")
313
  except Exception as e:
@@ -337,11 +328,11 @@ async def handle_mention(event, say, client):
337
 
338
  try:
339
  # Check if table is empty
340
- if await asyncio.to_thread(is_table_empty):
341
  await say("My knowledge base is empty. Please share some PDF or DOCX files first so I can learn!")
342
  return
343
 
344
- results = search_documents(user_query, match_count=5)
345
 
346
  if not results:
347
  await say("I couldn't find any relevant information in my knowledge base. Try uploading more documents.")
@@ -349,7 +340,7 @@ async def handle_mention(event, say, client):
349
 
350
  # Combine the content of the top documents into a single context string
351
  context = " ".join([doc["content"] for doc in results])
352
- answer = answer_question(user_query, context)
353
 
354
  # Format the final response
355
  await say(f"💡 *Answer:* {answer}\n\n_I found this information by analyzing {len(results)} relevant document chunks._")
@@ -381,10 +372,8 @@ async def health():
381
  """Health check endpoint."""
382
  db_status = "error"
383
  try:
384
- loop = asyncio.get_event_loop()
385
- await loop.run_in_executor(
386
- None,
387
- lambda: supabase.table("installations").select("team_id").limit(1).execute()
388
  )
389
  db_status = "ok"
390
  except Exception as e:
 
11
  from uvicorn import run as uvicorn_run
12
 
13
  # Slack Libraries (Async versions)
14
+ from slack_bolt.async_app import AsyncApp
15
  from slack_bolt.adapter.fastapi import AsyncSlackRequestHandler
16
  from slack_sdk.oauth.installation_store.async_installation_store import AsyncInstallationStore
17
  from slack_sdk.oauth import AuthorizeUrlGenerator
 
67
  self.supabase = supabase_client
68
 
69
  async def save(self, installation: Installation) -> None:
 
70
  data = {
71
  "team_id": installation.team_id,
72
  "bot_token": installation.bot_token,
 
75
  "app_id": installation.app_id,
76
  }
77
  try:
78
+ await asyncio.to_thread(
79
+ self.supabase.table("installations").upsert, data, on_conflict="team_id"
 
80
  )
81
+ await asyncio.to_thread(self.supabase.table("installations").upsert(data, on_conflict="team_id").execute)
82
  logger.info(f"Saved installation for team: {installation.team_id}")
83
  except Exception as e:
84
  logger.error(f"Failed to save installation for team {installation.team_id}: {e}")
85
  raise
86
 
87
  async def fetch_installation(self, team_id: str, *, enterprise_id: str | None = None, user_id: str | None = None) -> Installation | None:
 
88
  try:
89
+ result = await asyncio.to_thread(
90
+ self.supabase.table("installations").select("*").eq("team_id", team_id).execute
 
91
  )
92
  if not result.data:
93
  return None
 
108
  raise
109
 
110
  async def delete(self, team_id: str, *, enterprise_id: str | None = None, user_id: str | None = None) -> None:
 
111
  try:
112
+ await asyncio.to_thread(
113
+ self.supabase.table("installations").delete().eq("team_id", team_id).execute
 
114
  )
115
  logger.info(f"Deleted installation for team: {team_id}")
116
  except Exception as e:
 
204
  embedding = embedding_model.encode(text, convert_to_tensor=False)
205
  return embedding.tolist()
206
 
207
+ async def store_embeddings(chunks: List[str]):
208
  """Stores text chunks and their embeddings in Supabase."""
 
209
  for chunk in chunks:
210
  embedding = embed_text(chunk)
211
  try:
212
+ await asyncio.to_thread(
213
+ supabase.table("documents").insert({
214
+ "content": chunk,
215
+ "embedding": embedding
216
+ }).execute
 
217
  )
218
  except Exception as e:
219
  logger.error(f"Failed to insert chunk into Supabase: {str(e)}")
220
 
221
+ async def is_table_empty() -> bool:
222
  """Checks if the documents table has any records."""
223
  try:
224
+ result = await asyncio.to_thread(
225
+ supabase.table("documents").select("id", count="exact").limit(1).execute
 
 
226
  )
227
  return result.count == 0
228
  except Exception as e:
229
  logger.error(f"Error checking table emptiness: {str(e)}")
230
  return True
231
 
232
+ async def search_documents(query: str, match_count: int = 5) -> List[Dict[str, Any]]:
233
  """Searches Supabase for documents matching the query using vector similarity."""
234
  if embedding_model is None:
235
  raise RuntimeError("Embedding model not loaded.")
236
  query_embedding = embed_text(query)
237
  try:
238
+ result = await asyncio.to_thread(
 
 
239
  lambda: supabase.rpc("match_documents", {
240
  "query_embedding": query_embedding,
241
  "match_count": match_count
 
246
  logger.error(f"Error searching documents for query '{query}': {str(e)}")
247
  return []
248
 
249
+ async def answer_question(question: str, context: str) -> str:
250
  """Answers a question based on the provided context using the QA pipeline."""
251
  if qa_pipeline is None:
252
  raise RuntimeError("QA pipeline not loaded.")
 
256
  MAX_CONTEXT_LEN = 4096
257
  context_slice = context[:MAX_CONTEXT_LEN]
258
 
259
+ result = await asyncio.to_thread(
260
+ qa_pipeline, question=question, context=context_slice
261
+ )
262
  return result['answer']
263
  except Exception as e:
264
  logger.error(f"Error in QA pipeline: {str(e)}")
 
282
  return
283
 
284
  token = client.token
285
+ file_content = await asyncio.to_thread(download_slack_file, file_url, token)
286
 
287
  text = ""
288
  if "pdf" in file_type:
289
+ text = await asyncio.to_thread(extract_text_from_pdf, file_content)
290
  elif "wordprocessingml" in file_type or "msword" in file_type:
291
+ text = await asyncio.to_thread(extract_text_from_docx, file_content)
292
  else:
293
  await say("Unsupported file type. Please upload PDF or DOCX files.")
294
  return
 
298
  return
299
 
300
  chunks = chunk_text(text)
301
+ await store_embeddings(chunks)
302
 
303
  await say(f"✅ File processed successfully! Added **{len(chunks)}** chunks to the knowledge base.")
304
  except Exception as e:
 
328
 
329
  try:
330
  # Check if table is empty
331
+ if await is_table_empty():
332
  await say("My knowledge base is empty. Please share some PDF or DOCX files first so I can learn!")
333
  return
334
 
335
+ results = await search_documents(user_query, match_count=5)
336
 
337
  if not results:
338
  await say("I couldn't find any relevant information in my knowledge base. Try uploading more documents.")
 
340
 
341
  # Combine the content of the top documents into a single context string
342
  context = " ".join([doc["content"] for doc in results])
343
+ answer = await answer_question(user_query, context)
344
 
345
  # Format the final response
346
  await say(f"💡 *Answer:* {answer}\n\n_I found this information by analyzing {len(results)} relevant document chunks._")
 
372
  """Health check endpoint."""
373
  db_status = "error"
374
  try:
375
+ await asyncio.to_thread(
376
+ supabase.table("installations").select("team_id").limit(1).execute
 
 
377
  )
378
  db_status = "ok"
379
  except Exception as e: