Jacek Zadrożny commited on
Commit
5411262
·
1 Parent(s): 06bb39f

Remove asyncio completely to fix event loop cleanup errors

Browse files

- Convert all async functions to synchronous
- Remove asyncio.to_thread() and event loop management
- Simplify embeddings client (direct API calls with retry logic)
- Simplify agent initialization (no event loops needed)
- Fix all docstring examples to remove await
- Eliminates 'Invalid file descriptor: -1' errors completely

Files changed (4) hide show
  1. agent/a11y_agent.py +6 -7
  2. agent/tools.py +3 -3
  3. app.py +9 -30
  4. models/embeddings.py +51 -67
agent/a11y_agent.py CHANGED
@@ -1,7 +1,6 @@
1
  """A11y Expert - Main accessibility question-answering agent."""
2
 
3
- import asyncio
4
- from typing import Optional, AsyncGenerator
5
  from openai import OpenAI
6
  from langdetect import detect, LangDetectException
7
  from config import get_settings
@@ -53,7 +52,7 @@ class A11yExpertAgent:
53
  except Exception as e:
54
  logger.warning(f"Error closing A11yExpertAgent: {e}")
55
 
56
- async def ask(self, question: str) -> AsyncGenerator[str, None]:
57
  """
58
  Ask a question and get a streaming answer with RAG.
59
 
@@ -77,7 +76,7 @@ class A11yExpertAgent:
77
  current_system_prompt = get_system_prompt(language, self.expertise)
78
 
79
  logger.info("Searching knowledge base...")
80
- context = await search_knowledge_base(question, self.vector_store, language=language)
81
 
82
  messages = [
83
  {"role": "system", "content": current_system_prompt},
@@ -158,12 +157,12 @@ Remember to:
158
  self.conversation_history = []
159
  logger.info("Conversation history cleared")
160
 
161
- async def batch_ask(self, questions: list[str]) -> list[dict]:
162
  """Ask multiple questions in sequence."""
163
  results = []
164
  for question in questions:
165
  try:
166
- answer_chunks = [chunk async for chunk in self.ask(question)]
167
  answer = "".join(answer_chunks)
168
  results.append({"question": question, "answer": answer, "success": True})
169
  except Exception as e:
@@ -172,7 +171,7 @@ Remember to:
172
  return results
173
 
174
 
175
- async def create_agent(language: Optional[str] = None) -> A11yExpertAgent:
176
  """Factory function to create and initialize agent."""
177
  language = language or "en"
178
 
 
1
  """A11y Expert - Main accessibility question-answering agent."""
2
 
3
+ from typing import Optional, Generator
 
4
  from openai import OpenAI
5
  from langdetect import detect, LangDetectException
6
  from config import get_settings
 
52
  except Exception as e:
53
  logger.warning(f"Error closing A11yExpertAgent: {e}")
54
 
55
+ def ask(self, question: str) -> Generator[str, None, None]:
56
  """
57
  Ask a question and get a streaming answer with RAG.
58
 
 
76
  current_system_prompt = get_system_prompt(language, self.expertise)
77
 
78
  logger.info("Searching knowledge base...")
79
+ context = search_knowledge_base(question, self.vector_store, language=language)
80
 
81
  messages = [
82
  {"role": "system", "content": current_system_prompt},
 
157
  self.conversation_history = []
158
  logger.info("Conversation history cleared")
159
 
160
+ def batch_ask(self, questions: list[str]) -> list[dict]:
161
  """Ask multiple questions in sequence."""
162
  results = []
163
  for question in questions:
164
  try:
165
+ answer_chunks = [chunk for chunk in self.ask(question)]
166
  answer = "".join(answer_chunks)
167
  results.append({"question": question, "answer": answer, "success": True})
168
  except Exception as e:
 
171
  return results
172
 
173
 
174
+ def create_agent(language: Optional[str] = None) -> A11yExpertAgent:
175
  """Factory function to create and initialize agent."""
176
  language = language or "en"
177
 
agent/tools.py CHANGED
@@ -5,7 +5,7 @@ from database.vector_store_client import VectorStoreClient
5
  from models.embeddings import get_embeddings_client
6
  from loguru import logger
7
 
8
- async def search_knowledge_base(
9
  query: str,
10
  vector_store: VectorStoreClient,
11
  language: str = "en"
@@ -25,7 +25,7 @@ async def search_knowledge_base(
25
  logger.info(f"Query: {query} (language: {language})")
26
 
27
  embeddings_client = get_embeddings_client()
28
- query_embedding = await embeddings_client.get_embedding(query)
29
 
30
  where_clause = f"language = '{language}'"
31
  results = vector_store.search(
@@ -50,7 +50,7 @@ async def search_knowledge_base(
50
  logger.error(f"Search failed: {e}")
51
  return f"Error searching knowledge base: {str(e)}"
52
 
53
- async def get_database_stats(vector_store: VectorStoreClient) -> str:
54
  """
55
  Get statistics about the knowledge base.
56
 
 
5
  from models.embeddings import get_embeddings_client
6
  from loguru import logger
7
 
8
+ def search_knowledge_base(
9
  query: str,
10
  vector_store: VectorStoreClient,
11
  language: str = "en"
 
25
  logger.info(f"Query: {query} (language: {language})")
26
 
27
  embeddings_client = get_embeddings_client()
28
+ query_embedding = embeddings_client.get_embedding(query)
29
 
30
  where_clause = f"language = '{language}'"
31
  results = vector_store.search(
 
50
  logger.error(f"Search failed: {e}")
51
  return f"Error searching knowledge base: {str(e)}"
52
 
53
+ def get_database_stats(vector_store: VectorStoreClient) -> str:
54
  """
55
  Get statistics about the knowledge base.
56
 
app.py CHANGED
@@ -3,13 +3,11 @@ Gradio UI for the A11y Expert Agent with lazy initialization.
3
  This module creates a Gradio ChatInterface that starts FAST,
4
  then initializes the agent in the background.
5
  """
6
- import asyncio
7
  import gradio as gr
8
  from loguru import logger
9
  import sys
10
  import atexit
11
  import threading
12
- import time
13
  from agent.a11y_agent import create_agent, A11yExpertAgent
14
  from config import get_settings
15
 
@@ -22,30 +20,16 @@ logger.add(sys.stderr, level=get_settings().log_level)
22
  agent_instance: A11yExpertAgent = None
23
  agent_ready = False
24
  agent_error = None
25
- agent_loop = None # Keep reference to prevent garbage collection
26
 
27
  # --- Agent Initialization ---
28
  def initialize_agent_background():
29
  """Initialize the agent in background thread."""
30
- global agent_instance, agent_ready, agent_error, agent_loop
31
  try:
32
  logger.info("🔄 Starting agent initialization in background...")
33
-
34
- # Create a new event loop for this thread and set it as current
35
- agent_loop = asyncio.new_event_loop()
36
- asyncio.set_event_loop(agent_loop)
37
-
38
- try:
39
- # Run the async initialization
40
- async def init():
41
- return await create_agent()
42
-
43
- agent_instance = agent_loop.run_until_complete(init())
44
- agent_ready = True
45
- logger.success("✅ A11y Expert Agent is ready!")
46
- finally:
47
- # Don't close the loop here - we need it for the respond function
48
- pass
49
  except Exception as e:
50
  logger.error(f"❌ Failed to initialize agent: {e}")
51
  agent_error = str(e)
@@ -53,7 +37,7 @@ def initialize_agent_background():
53
 
54
  def cleanup_resources():
55
  """Clean up resources on app shutdown."""
56
- global agent_instance, agent_loop
57
  logger.info("Cleaning up resources...")
58
  try:
59
  # Close agent and all its resources
@@ -65,17 +49,12 @@ def cleanup_resources():
65
  if hasattr(get_embeddings_client, '_instance'):
66
  get_embeddings_client._instance.close()
67
 
68
- # Properly close the event loop
69
- if agent_loop and not agent_loop.is_closed():
70
- agent_loop.close()
71
- logger.info("Event loop closed")
72
-
73
  logger.success("✅ Resources cleaned up successfully")
74
  except Exception as e:
75
  logger.warning(f"Error during cleanup: {e}")
76
 
77
  # --- Gradio Chat Logic ---
78
- async def respond(message: str, history: list[list[str]]):
79
  """
80
  Main function for the Gradio ChatInterface.
81
  Receives a user message and chat history, then uses the agent
@@ -96,8 +75,9 @@ async def respond(message: str, history: list[list[str]]):
96
 
97
  yield "⏳ Agent is initializing, please wait..."
98
  # Wait up to 120 seconds for agent to be ready
 
99
  for i in range(120):
100
- await asyncio.sleep(1)
101
  if agent_ready:
102
  break
103
  if agent_error:
@@ -115,8 +95,7 @@ async def respond(message: str, history: list[list[str]]):
115
  logger.info(f"User query: '{message}'")
116
  full_response = ""
117
  try:
118
- # Use the global event loop to run async generator
119
- async for chunk in agent_instance.ask(message):
120
  full_response += chunk
121
  yield full_response
122
  except Exception as e:
 
3
  This module creates a Gradio ChatInterface that starts FAST,
4
  then initializes the agent in the background.
5
  """
 
6
  import gradio as gr
7
  from loguru import logger
8
  import sys
9
  import atexit
10
  import threading
 
11
  from agent.a11y_agent import create_agent, A11yExpertAgent
12
  from config import get_settings
13
 
 
20
  agent_instance: A11yExpertAgent = None
21
  agent_ready = False
22
  agent_error = None
 
23
 
24
  # --- Agent Initialization ---
25
  def initialize_agent_background():
26
  """Initialize the agent in background thread."""
27
+ global agent_instance, agent_ready, agent_error
28
  try:
29
  logger.info("🔄 Starting agent initialization in background...")
30
+ agent_instance = create_agent()
31
+ agent_ready = True
32
+ logger.success("✅ A11y Expert Agent is ready!")
 
 
 
 
 
 
 
 
 
 
 
 
 
33
  except Exception as e:
34
  logger.error(f"❌ Failed to initialize agent: {e}")
35
  agent_error = str(e)
 
37
 
38
  def cleanup_resources():
39
  """Clean up resources on app shutdown."""
40
+ global agent_instance
41
  logger.info("Cleaning up resources...")
42
  try:
43
  # Close agent and all its resources
 
49
  if hasattr(get_embeddings_client, '_instance'):
50
  get_embeddings_client._instance.close()
51
 
 
 
 
 
 
52
  logger.success("✅ Resources cleaned up successfully")
53
  except Exception as e:
54
  logger.warning(f"Error during cleanup: {e}")
55
 
56
  # --- Gradio Chat Logic ---
57
+ def respond(message: str, history: list[list[str]]):
58
  """
59
  Main function for the Gradio ChatInterface.
60
  Receives a user message and chat history, then uses the agent
 
75
 
76
  yield "⏳ Agent is initializing, please wait..."
77
  # Wait up to 120 seconds for agent to be ready
78
+ import time
79
  for i in range(120):
80
+ time.sleep(1)
81
  if agent_ready:
82
  break
83
  if agent_error:
 
95
  logger.info(f"User query: '{message}'")
96
  full_response = ""
97
  try:
98
+ for chunk in agent_instance.ask(message):
 
99
  full_response += chunk
100
  yield full_response
101
  except Exception as e:
models/embeddings.py CHANGED
@@ -12,7 +12,6 @@ from functools import wraps
12
  from openai import OpenAI, RateLimitError
13
  from config import get_settings
14
  from loguru import logger
15
- import asyncio
16
 
17
  try:
18
  from diskcache import Cache
@@ -22,41 +21,6 @@ except ImportError:
22
  logger.warning("diskcache not available - embeddings caching disabled")
23
 
24
 
25
- def retry_on_rate_limit(max_retries: int = 5):
26
- """
27
- Decorator for retrying async functions on rate limit with exponential backoff.
28
-
29
- Args:
30
- max_retries: Maximum number of retry attempts
31
-
32
- Returns:
33
- Decorated function with retry logic
34
-
35
- Examples:
36
- >>> @retry_on_rate_limit(max_retries=3)
37
- ... async def my_api_call():
38
- ... return await some_api()
39
- """
40
- def decorator(func):
41
- @wraps(func)
42
- async def wrapper(*args, **kwargs):
43
- for attempt in range(max_retries):
44
- try:
45
- return await func(*args, **kwargs)
46
- except RateLimitError as e:
47
- if attempt == max_retries - 1:
48
- raise
49
- wait_time = (2 ** attempt) * 2 # Exponential: 2s, 4s, 8s, 16s, 32s
50
- logger.warning(
51
- f"Rate limited. Retrying in {wait_time}s "
52
- f"(attempt {attempt + 1}/{max_retries})"
53
- )
54
- await asyncio.sleep(wait_time)
55
- raise RuntimeError(f"Failed after {max_retries} retries")
56
- return wrapper
57
- return decorator
58
-
59
-
60
  class EmbeddingsClient:
61
  """
62
  Client for generating embeddings with caching and retry logic.
@@ -75,12 +39,12 @@ class EmbeddingsClient:
75
 
76
  Examples:
77
  >>> client = EmbeddingsClient()
78
- >>> embedding = await client.get_embedding("Hello world")
79
  >>> len(embedding)
80
  3072
81
 
82
  >>> # Second call uses cache
83
- >>> embedding2 = await client.get_embedding("Hello world")
84
  >>> embedding == embedding2
85
  True
86
  """
@@ -133,27 +97,36 @@ class EmbeddingsClient:
133
  """
134
  return hashlib.md5(f"{self.model}:{text}".encode()).hexdigest()
135
 
136
- @retry_on_rate_limit(max_retries=5)
137
- async def _get_embedding_uncached(self, text: str) -> List[float]:
138
  """
139
  Generate embedding without cache (internal method).
140
 
141
  Args:
142
  text: Input text (already truncated)
 
143
 
144
  Returns:
145
  Embedding vector
146
  """
147
- response = await asyncio.to_thread(
148
- self.client.embeddings.create,
149
- model=self.model,
150
- input=text
151
- )
152
- embedding = response.data[0].embedding
153
- logger.debug(f"Generated embedding (dim={len(embedding)})")
154
- return embedding
 
 
 
 
 
 
 
 
 
155
 
156
- async def get_embedding(self, text: str) -> List[float]:
157
  """
158
  Generate embedding for text with caching.
159
 
@@ -167,7 +140,7 @@ class EmbeddingsClient:
167
  List of float values representing the embedding
168
 
169
  Examples:
170
- >>> embedding = await client.get_embedding("Hello world")
171
  >>> len(embedding)
172
  3072
173
  """
@@ -182,7 +155,7 @@ class EmbeddingsClient:
182
  return self.cache[cache_key]
183
 
184
  # Generate embedding
185
- embedding = await self._get_embedding_uncached(text)
186
 
187
  # Store in cache
188
  if self.cache is not None:
@@ -191,30 +164,40 @@ class EmbeddingsClient:
191
 
192
  return embedding
193
 
194
- @retry_on_rate_limit(max_retries=3)
195
- async def _get_embeddings_batch_uncached(
196
  self,
197
- texts: List[str]
 
198
  ) -> List[List[float]]:
199
  """
200
  Generate embeddings for batch without cache (internal method).
201
 
202
  Args:
203
  texts: List of texts (already truncated)
 
204
 
205
  Returns:
206
  List of embedding vectors
207
  """
208
- response = await asyncio.to_thread(
209
- self.client.embeddings.create,
210
- model=self.model,
211
- input=texts
212
- )
213
- # Sort by index to maintain order
214
- batch_embeddings = sorted(response.data, key=lambda x: x.index)
215
- return [e.embedding for e in batch_embeddings]
 
 
 
 
 
 
 
 
 
216
 
217
- async def get_embeddings_batch(
218
  self,
219
  texts: List[str],
220
  batch_size: int = 100
@@ -234,10 +217,11 @@ class EmbeddingsClient:
234
 
235
  Examples:
236
  >>> texts = ["Hello", "World", "!"]
237
- >>> embeddings = await client.get_embeddings_batch(texts)
238
  >>> len(embeddings)
239
  3
240
  """
 
241
  all_embeddings = []
242
 
243
  for i in range(0, len(texts), batch_size):
@@ -280,7 +264,7 @@ class EmbeddingsClient:
280
  # Generate embeddings for cache misses
281
  if texts_to_generate:
282
  try:
283
- generated = await self._get_embeddings_batch_uncached(
284
  texts_to_generate
285
  )
286
 
@@ -306,7 +290,7 @@ class EmbeddingsClient:
306
 
307
  # Small delay between batches to avoid rate limiting
308
  if num_batches > 1 and current_batch_num < num_batches:
309
- await asyncio.sleep(0.5)
310
 
311
  logger.success(f" 🧠 Generated {len(all_embeddings)} embeddings total.")
312
  return all_embeddings
@@ -321,7 +305,7 @@ def get_embeddings_client() -> EmbeddingsClient:
321
 
322
  Examples:
323
  >>> client = get_embeddings_client()
324
- >>> embedding = await client.get_embedding("test")
325
  """
326
  if not hasattr(get_embeddings_client, '_instance'):
327
  get_embeddings_client._instance = EmbeddingsClient()
 
12
  from openai import OpenAI, RateLimitError
13
  from config import get_settings
14
  from loguru import logger
 
15
 
16
  try:
17
  from diskcache import Cache
 
21
  logger.warning("diskcache not available - embeddings caching disabled")
22
 
23
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
24
  class EmbeddingsClient:
25
  """
26
  Client for generating embeddings with caching and retry logic.
 
39
 
40
  Examples:
41
  >>> client = EmbeddingsClient()
42
+ >>> embedding = client.get_embedding("Hello world")
43
  >>> len(embedding)
44
  3072
45
 
46
  >>> # Second call uses cache
47
+ >>> embedding2 = client.get_embedding("Hello world")
48
  >>> embedding == embedding2
49
  True
50
  """
 
97
  """
98
  return hashlib.md5(f"{self.model}:{text}".encode()).hexdigest()
99
 
100
+ def _get_embedding_uncached(self, text: str, retry_count: int = 5) -> List[float]:
 
101
  """
102
  Generate embedding without cache (internal method).
103
 
104
  Args:
105
  text: Input text (already truncated)
106
+ retry_count: Number of retries on rate limit
107
 
108
  Returns:
109
  Embedding vector
110
  """
111
+ for attempt in range(retry_count):
112
+ try:
113
+ response = self.client.embeddings.create(
114
+ model=self.model,
115
+ input=text
116
+ )
117
+ embedding = response.data[0].embedding
118
+ logger.debug(f"Generated embedding (dim={len(embedding)})")
119
+ return embedding
120
+ except RateLimitError as e:
121
+ if attempt == retry_count - 1:
122
+ raise
123
+ wait_time = (2 ** attempt) * 2
124
+ logger.warning(f"Rate limited. Retrying in {wait_time}s (attempt {attempt + 1}/{retry_count})")
125
+ import time
126
+ time.sleep(wait_time)
127
+ raise RuntimeError(f"Failed after {retry_count} retries")
128
 
129
+ def get_embedding(self, text: str) -> List[float]:
130
  """
131
  Generate embedding for text with caching.
132
 
 
140
  List of float values representing the embedding
141
 
142
  Examples:
143
+ >>> embedding = client.get_embedding("Hello world")
144
  >>> len(embedding)
145
  3072
146
  """
 
155
  return self.cache[cache_key]
156
 
157
  # Generate embedding
158
+ embedding = self._get_embedding_uncached(text)
159
 
160
  # Store in cache
161
  if self.cache is not None:
 
164
 
165
  return embedding
166
 
167
+ def _get_embeddings_batch_uncached(
 
168
  self,
169
+ texts: List[str],
170
+ retry_count: int = 3
171
  ) -> List[List[float]]:
172
  """
173
  Generate embeddings for batch without cache (internal method).
174
 
175
  Args:
176
  texts: List of texts (already truncated)
177
+ retry_count: Number of retries on rate limit
178
 
179
  Returns:
180
  List of embedding vectors
181
  """
182
+ for attempt in range(retry_count):
183
+ try:
184
+ response = self.client.embeddings.create(
185
+ model=self.model,
186
+ input=texts
187
+ )
188
+ # Sort by index to maintain order
189
+ batch_embeddings = sorted(response.data, key=lambda x: x.index)
190
+ return [e.embedding for e in batch_embeddings]
191
+ except RateLimitError as e:
192
+ if attempt == retry_count - 1:
193
+ raise
194
+ wait_time = (2 ** attempt) * 2
195
+ logger.warning(f"Rate limited. Retrying in {wait_time}s (attempt {attempt + 1}/{retry_count})")
196
+ import time
197
+ time.sleep(wait_time)
198
+ raise RuntimeError(f"Failed after {retry_count} retries")
199
 
200
+ def get_embeddings_batch(
201
  self,
202
  texts: List[str],
203
  batch_size: int = 100
 
217
 
218
  Examples:
219
  >>> texts = ["Hello", "World", "!"]
220
+ >>> embeddings = client.get_embeddings_batch(texts)
221
  >>> len(embeddings)
222
  3
223
  """
224
+ import time
225
  all_embeddings = []
226
 
227
  for i in range(0, len(texts), batch_size):
 
264
  # Generate embeddings for cache misses
265
  if texts_to_generate:
266
  try:
267
+ generated = self._get_embeddings_batch_uncached(
268
  texts_to_generate
269
  )
270
 
 
290
 
291
  # Small delay between batches to avoid rate limiting
292
  if num_batches > 1 and current_batch_num < num_batches:
293
+ time.sleep(0.5)
294
 
295
  logger.success(f" 🧠 Generated {len(all_embeddings)} embeddings total.")
296
  return all_embeddings
 
305
 
306
  Examples:
307
  >>> client = get_embeddings_client()
308
+ >>> embedding = client.get_embedding("test")
309
  """
310
  if not hasattr(get_embeddings_client, '_instance'):
311
  get_embeddings_client._instance = EmbeddingsClient()