sethmcknight commited on
Commit
6d37c4a
·
1 Parent(s): b3b90ec

fix: Add lock to prevent ingestion race condition

Browse files
Files changed (1) hide show
  1. src/app_factory.py +40 -12
src/app_factory.py CHANGED
@@ -6,6 +6,7 @@ This approach allows for easier testing and management of application state.
6
  import concurrent.futures
7
  import logging
8
  import os
 
9
  from typing import Any, Dict
10
 
11
  from dotenv import load_dotenv
@@ -27,21 +28,43 @@ def ensure_embeddings_on_startup():
27
  """
28
  Ensure embeddings exist and have the correct dimension on app startup.
29
  This is critical for Render deployments where the vector store is ephemeral.
 
30
  """
31
- from src.config import (
32
- COLLECTION_NAME,
33
- CORPUS_DIRECTORY,
34
- DEFAULT_CHUNK_SIZE,
35
- DEFAULT_OVERLAP,
36
- EMBEDDING_DIMENSION,
37
- EMBEDDING_MODEL_NAME,
38
- RANDOM_SEED,
39
- VECTOR_DB_PERSIST_PATH,
40
- )
41
- from src.ingestion.ingestion_pipeline import IngestionPipeline
42
- from src.vector_store.vector_db import VectorDatabase
 
 
 
43
 
44
  try:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
45
  logging.info("Checking vector store on startup...")
46
 
47
  # Initialize vector database to check its state
@@ -85,6 +108,11 @@ def ensure_embeddings_on_startup():
85
  logging.error(f"Failed to ensure embeddings on startup: {e}")
86
  # Don't crash the app, but log the error
87
  # The app will still start but searches may fail
 
 
 
 
 
88
 
89
 
90
  def create_app(
 
6
  import concurrent.futures
7
  import logging
8
  import os
9
+ import time
10
  from typing import Any, Dict
11
 
12
  from dotenv import load_dotenv
 
28
  """
29
  Ensure embeddings exist and have the correct dimension on app startup.
30
  This is critical for Render deployments where the vector store is ephemeral.
31
+ Uses a file-based lock to prevent race conditions between workers.
32
  """
33
+ lock_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "data", "locks")
34
+ if not os.path.exists(lock_dir):
35
+ os.makedirs(lock_dir)
36
+ lock_file = os.path.join(lock_dir, "ingestion.lock")
37
+ lock_timeout = 180 # 3 minutes
38
+
39
+ start_time = time.time()
40
+ while os.path.exists(lock_file):
41
+ if time.time() - start_time > lock_timeout:
42
+ logging.error(f"Lock file {lock_file} has been present for over {lock_timeout} seconds. Aborting wait.")
43
+ # In a real-world scenario, you might want to raise an exception
44
+ # or attempt to delete a stale lock file. For now, we just stop waiting.
45
+ return
46
+ logging.info(f"Another process is handling ingestion. Waiting for lock file {lock_file} to be released...")
47
+ time.sleep(5)
48
 
49
  try:
50
+ # Acquire lock
51
+ with open(lock_file, "w") as f:
52
+ f.write(str(os.getpid()))
53
+ logging.info(f"Acquired ingestion lock: {lock_file}")
54
+
55
+ from src.config import (
56
+ COLLECTION_NAME,
57
+ CORPUS_DIRECTORY,
58
+ DEFAULT_CHUNK_SIZE,
59
+ DEFAULT_OVERLAP,
60
+ EMBEDDING_DIMENSION,
61
+ EMBEDDING_MODEL_NAME,
62
+ RANDOM_SEED,
63
+ VECTOR_DB_PERSIST_PATH,
64
+ )
65
+ from src.ingestion.ingestion_pipeline import IngestionPipeline
66
+ from src.vector_store.vector_db import VectorDatabase
67
+
68
  logging.info("Checking vector store on startup...")
69
 
70
  # Initialize vector database to check its state
 
108
  logging.error(f"Failed to ensure embeddings on startup: {e}")
109
  # Don't crash the app, but log the error
110
  # The app will still start but searches may fail
111
+ finally:
112
+ # Release lock
113
+ if os.path.exists(lock_file):
114
+ os.remove(lock_file)
115
+ logging.info(f"Released ingestion lock: {lock_file}")
116
 
117
 
118
  def create_app(