kishanAmaliya commited on
Commit
2e5fc0d
·
1 Parent(s): ef32b53

Deployment: Synchronized strategies for hybrid cloud-native environments

Browse files
Files changed (1) hide show
  1. app/LLD/qdrant_strategy.py +25 -16
app/LLD/qdrant_strategy.py CHANGED
@@ -1,3 +1,4 @@
 
1
  import uuid
2
  from typing import List, Dict, Any
3
  from qdrant_client import QdrantClient
@@ -8,31 +9,46 @@ from app.core.config import settings
8
  class QdrantVectorStoreStrategy(VectorStoreInterface):
9
  """
10
  Concrete implementation of VectorStoreInterface leveraging Qdrant DB.
11
- Encapsulates schema enforcement, item upsert workflows, and multi-modal calculations.
12
  """
13
  def __init__(self) -> None:
14
- # Establish client connection targeting the docker-compose service network
15
- self.client = QdrantClient(host=settings.QDRANT_HOST, port=settings.QDRANT_PORT)
 
 
 
 
 
 
 
 
 
 
 
16
  self.collection_name = "video_frames"
17
  self._ensure_collection_exists()
18
 
19
  def _ensure_collection_exists(self) -> None:
20
  """
21
- Idempotent schema controller checking for collection persistence
22
- and initializing vector parameters if absent.
23
  """
24
  try:
25
  if not self.client.collection_exists(collection_name=self.collection_name):
 
26
  self.client.create_collection(
27
  collection_name=self.collection_name,
28
  vectors_config=VectorParams(
29
- size=settings.VECTOR_DIMENSION,
30
  distance=Distance.COSINE
31
  )
32
  )
 
33
  except Exception as e:
34
- # Crucial LLD practice: fail fast during object construction if state is compromised
35
- raise RuntimeError(f"Failed initializing Qdrant collection layer: {str(e)}")
 
 
36
 
37
  def upsert_embeddings(self, video_id: str, embeddings: List[List[float]], metadata: List[Dict[str, Any]]) -> bool:
38
  """
@@ -42,15 +58,12 @@ class QdrantVectorStoreStrategy(VectorStoreInterface):
42
  try:
43
  points = []
44
  for idx, (vector, meta) in enumerate(zip(embeddings, metadata)):
45
- # Inject parent relational identifier directly into metadata payload
46
  payload = {
47
  "video_id": video_id,
48
  "timestamp_seconds": meta.get("timestamp_seconds"),
49
  "file_path": meta.get("file_path")
50
  }
51
 
52
- # Use deterministic UUID generation based on the namespace
53
- # to prevent document duplication during re-processing jobs
54
  point_id = str(uuid.uuid5(uuid.NAMESPACE_DNS, f"{video_id}_{idx}"))
55
 
56
  points.append(
@@ -61,7 +74,6 @@ class QdrantVectorStoreStrategy(VectorStoreInterface):
61
  )
62
  )
63
 
64
- # Execute transactional batch upsert operation
65
  self.client.upsert(
66
  collection_name=self.collection_name,
67
  wait=True,
@@ -70,7 +82,6 @@ class QdrantVectorStoreStrategy(VectorStoreInterface):
70
  return True
71
 
72
  except Exception as e:
73
- # Structured application reporting block
74
  print(f"[ERROR] Vector storage execution fault for Video {video_id}: {str(e)}")
75
  return False
76
 
@@ -86,7 +97,6 @@ class QdrantVectorStoreStrategy(VectorStoreInterface):
86
  limit=top_k
87
  )
88
 
89
- # Translate raw database structures into standard clean payloads
90
  formatted_results = []
91
  for hit in search_results:
92
  formatted_results.append({
@@ -99,5 +109,4 @@ class QdrantVectorStoreStrategy(VectorStoreInterface):
99
 
100
  except Exception as e:
101
  print(f"[ERROR] Vector distance analysis failed: {str(e)}")
102
- return []
103
-
 
1
+ import os
2
  import uuid
3
  from typing import List, Dict, Any
4
  from qdrant_client import QdrantClient
 
9
  class QdrantVectorStoreStrategy(VectorStoreInterface):
10
  """
11
  Concrete implementation of VectorStoreInterface leveraging Qdrant DB.
12
+ Optimized for zero-downtime hot switches between local development and cloud clusters.
13
  """
14
  def __init__(self) -> None:
15
+ # 1. Prioritize Cloud Environment Secrets over Local Config File definitions
16
+ qdrant_url = os.getenv("QDRANT_URL") or getattr(settings, "QDRANT_URL", None)
17
+ qdrant_api_key = os.getenv("QDRANT_API_KEY") or getattr(settings, "QDRANT_API_KEY", None)
18
+
19
+ if qdrant_url:
20
+ print(f"[QDRANT CLUSTER] Establishing secure connection link to cloud instance: {qdrant_url}")
21
+ self.client = QdrantClient(url=qdrant_url, api_key=qdrant_api_key)
22
+ else:
23
+ host = getattr(settings, "QDRANT_HOST", "localhost")
24
+ port = int(getattr(settings, "QDRANT_PORT", 6333))
25
+ print(f"[QDRANT LOCAL] Establishing standard connection link to container: {host}:{port}")
26
+ self.client = QdrantClient(host=host, port=port)
27
+
28
  self.collection_name = "video_frames"
29
  self._ensure_collection_exists()
30
 
31
  def _ensure_collection_exists(self) -> None:
32
  """
33
+ Idempotent schema controller. Log warnings instead of completely freezing the
34
+ container boot sequence if cloud cloud connections take a moment to synchronize.
35
  """
36
  try:
37
  if not self.client.collection_exists(collection_name=self.collection_name):
38
+ vector_size = getattr(settings, "VECTOR_DIMENSION", 1152) # Uses default SigLIP dimension fallback
39
  self.client.create_collection(
40
  collection_name=self.collection_name,
41
  vectors_config=VectorParams(
42
+ size=vector_size,
43
  distance=Distance.COSINE
44
  )
45
  )
46
+ print(f"[QDRANT SUCCESS] Index space '{self.collection_name}' successfully verified.")
47
  except Exception as e:
48
+ # Prevent system lockouts on cluster re-build operations
49
+ print("\n⚠️ [QDRANT INTERCEPT WARNING] ⚠️")
50
+ print(f"Could not instantly connect to remote collection: {str(e)}")
51
+ print("The gateway instance will continue booting. Hot retries active for incoming user requests.\n")
52
 
53
  def upsert_embeddings(self, video_id: str, embeddings: List[List[float]], metadata: List[Dict[str, Any]]) -> bool:
54
  """
 
58
  try:
59
  points = []
60
  for idx, (vector, meta) in enumerate(zip(embeddings, metadata)):
 
61
  payload = {
62
  "video_id": video_id,
63
  "timestamp_seconds": meta.get("timestamp_seconds"),
64
  "file_path": meta.get("file_path")
65
  }
66
 
 
 
67
  point_id = str(uuid.uuid5(uuid.NAMESPACE_DNS, f"{video_id}_{idx}"))
68
 
69
  points.append(
 
74
  )
75
  )
76
 
 
77
  self.client.upsert(
78
  collection_name=self.collection_name,
79
  wait=True,
 
82
  return True
83
 
84
  except Exception as e:
 
85
  print(f"[ERROR] Vector storage execution fault for Video {video_id}: {str(e)}")
86
  return False
87
 
 
97
  limit=top_k
98
  )
99
 
 
100
  formatted_results = []
101
  for hit in search_results:
102
  formatted_results.append({
 
109
 
110
  except Exception as e:
111
  print(f"[ERROR] Vector distance analysis failed: {str(e)}")
112
+ return []