AdithyaVardan commited on
Commit
1fbfa0e
·
1 Parent(s): cbf8345

Fix Qdrant search API, add Supabase writes to agent pipelines, pin qdrant-client 1.17.1

Browse files
agent/tools/doc_search.py CHANGED
@@ -144,51 +144,34 @@ async def run_doc_search(query: str, team_id: str) -> list[RetrievedChunk]:
144
 
145
  try:
146
  client = AsyncQdrantClient(host=settings.qdrant_host, port=settings.qdrant_port)
147
- results = await client.search(
 
 
 
 
148
  collection_name=settings.qdrant_collection,
149
- query_vector=qmodels.NamedVector(
150
- name=settings.qdrant_dense_vector_name,
151
- vector=dense_vector,
152
- ),
153
- query_filter=qmodels.Filter(
154
- must=[
155
- qmodels.FieldCondition(
156
- key="team_id",
157
- match=qmodels.MatchValue(value=team_id),
158
- )
159
- ]
160
- ),
161
  limit=settings.rrf_top_k,
162
  with_payload=True,
163
  )
164
- for hit in results:
165
  doc_id = hit.payload.get("chunk_id", str(hit.id))
166
  qdrant_ranked_ids.append(doc_id)
167
  qdrant_payload_map[doc_id] = hit.payload
168
  qdrant_score_map[doc_id] = hit.score
169
 
170
- sparse_results = await client.search(
171
  collection_name=settings.qdrant_collection,
172
- query_vector=qmodels.NamedSparseVector(
173
- name=settings.qdrant_sparse_vector_name,
174
- vector=qmodels.SparseVector(
175
- indices=sparse_indices,
176
- values=sparse_values,
177
- ),
178
- ),
179
- query_filter=qmodels.Filter(
180
- must=[
181
- qmodels.FieldCondition(
182
- key="team_id",
183
- match=qmodels.MatchValue(value=team_id),
184
- )
185
- ]
186
- ),
187
  limit=settings.rrf_top_k,
188
  with_payload=True,
189
  )
190
  sparse_ranked_ids: list[str] = []
191
- for hit in sparse_results:
192
  doc_id = hit.payload.get("chunk_id", str(hit.id))
193
  sparse_ranked_ids.append(doc_id)
194
  qdrant_payload_map.setdefault(doc_id, hit.payload)
 
144
 
145
  try:
146
  client = AsyncQdrantClient(host=settings.qdrant_host, port=settings.qdrant_port)
147
+ team_filter = qmodels.Filter(
148
+ must=[qmodels.FieldCondition(key="team_id", match=qmodels.MatchValue(value=team_id))]
149
+ )
150
+
151
+ dense_response = await client.query_points(
152
  collection_name=settings.qdrant_collection,
153
+ query=dense_vector,
154
+ using=settings.qdrant_dense_vector_name,
155
+ query_filter=team_filter,
 
 
 
 
 
 
 
 
 
156
  limit=settings.rrf_top_k,
157
  with_payload=True,
158
  )
159
+ for hit in dense_response.points:
160
  doc_id = hit.payload.get("chunk_id", str(hit.id))
161
  qdrant_ranked_ids.append(doc_id)
162
  qdrant_payload_map[doc_id] = hit.payload
163
  qdrant_score_map[doc_id] = hit.score
164
 
165
+ sparse_response = await client.query_points(
166
  collection_name=settings.qdrant_collection,
167
+ query=qmodels.SparseVector(indices=sparse_indices, values=sparse_values),
168
+ using=settings.qdrant_sparse_vector_name,
169
+ query_filter=team_filter,
 
 
 
 
 
 
 
 
 
 
 
 
170
  limit=settings.rrf_top_k,
171
  with_payload=True,
172
  )
173
  sparse_ranked_ids: list[str] = []
174
+ for hit in sparse_response.points:
175
  doc_id = hit.payload.get("chunk_id", str(hit.id))
176
  sparse_ranked_ids.append(doc_id)
177
  qdrant_payload_map.setdefault(doc_id, hit.payload)
requirements.txt CHANGED
@@ -10,7 +10,7 @@ FlagEmbedding==1.2.11
10
  gliner==0.2.13
11
 
12
  # Vector database
13
- qdrant-client==1.12.1
14
 
15
  # BM25
16
  rank-bm25==0.2.2
@@ -82,6 +82,3 @@ watchdog==6.0.0
82
  # File agent — OCR fallback for scanned PDFs (optional but recommended)
83
  pytesseract==0.3.13
84
  Pillow==11.1.0
85
-
86
- # FastAPI file upload support
87
- python-multipart==0.0.20
 
10
  gliner==0.2.13
11
 
12
  # Vector database
13
+ qdrant-client==1.17.1
14
 
15
  # BM25
16
  rank-bm25==0.2.2
 
82
  # File agent — OCR fallback for scanned PDFs (optional but recommended)
83
  pytesseract==0.3.13
84
  Pillow==11.1.0
 
 
 
src/confluence_agent/pipeline.py CHANGED
@@ -4,6 +4,7 @@ import logging
4
 
5
  from ingestion.pipeline.embedder import embed_chunks
6
  from ingestion.pipeline.pii_masker import mask_chunks
 
7
  from ingestion.storage.qdrant_store import delete_chunks_for_doc, upsert_chunks
8
  from src.confluence_agent.adapter import ConfluenceAdapter
9
  from src.confluence_agent.chunker import chunk_confluence_page
@@ -12,6 +13,14 @@ from src.confluence_agent.config import confluence_config
12
  logger = logging.getLogger(__name__)
13
 
14
 
 
 
 
 
 
 
 
 
15
  async def ingest_page(page_id: str, space_key: str = "", team_id: str = "") -> int:
16
  team_id = team_id or confluence_config.team_id
17
  adapter = ConfluenceAdapter(team_id=team_id)
@@ -32,8 +41,7 @@ async def ingest_page(page_id: str, space_key: str = "", team_id: str = "") -> i
32
  chunk.text = m
33
 
34
  embedded = embed_chunks(chunks)
35
- delete_chunks_for_doc(raw_doc.doc_id)
36
- upsert_chunks(embedded)
37
 
38
  logger.info("confluence_pipeline: stored %d chunks for page %s", len(embedded), page_id)
39
  return len(embedded)
@@ -54,8 +62,7 @@ async def ingest_space(space_key: str, team_id: str = "") -> int:
54
  for chunk, m in zip(chunks, masked):
55
  chunk.text = m
56
  embedded = embed_chunks(chunks)
57
- delete_chunks_for_doc(raw_doc.doc_id)
58
- upsert_chunks(embedded)
59
  total += len(embedded)
60
  logger.info("confluence_pipeline: stored %d chunks for page %s", len(embedded), pid)
61
 
 
4
 
5
  from ingestion.pipeline.embedder import embed_chunks
6
  from ingestion.pipeline.pii_masker import mask_chunks
7
+ from ingestion.storage import supabase_store
8
  from ingestion.storage.qdrant_store import delete_chunks_for_doc, upsert_chunks
9
  from src.confluence_agent.adapter import ConfluenceAdapter
10
  from src.confluence_agent.chunker import chunk_confluence_page
 
13
  logger = logging.getLogger(__name__)
14
 
15
 
16
+ def _store(raw_doc, embedded):
17
+ supabase_store.upsert_document(raw_doc)
18
+ supabase_store.delete_chunks_for_doc(raw_doc.doc_id)
19
+ supabase_store.upsert_chunks(embedded)
20
+ delete_chunks_for_doc(raw_doc.doc_id)
21
+ upsert_chunks(embedded)
22
+
23
+
24
  async def ingest_page(page_id: str, space_key: str = "", team_id: str = "") -> int:
25
  team_id = team_id or confluence_config.team_id
26
  adapter = ConfluenceAdapter(team_id=team_id)
 
41
  chunk.text = m
42
 
43
  embedded = embed_chunks(chunks)
44
+ _store(raw_doc, embedded)
 
45
 
46
  logger.info("confluence_pipeline: stored %d chunks for page %s", len(embedded), page_id)
47
  return len(embedded)
 
62
  for chunk, m in zip(chunks, masked):
63
  chunk.text = m
64
  embedded = embed_chunks(chunks)
65
+ _store(raw_doc, embedded)
 
66
  total += len(embedded)
67
  logger.info("confluence_pipeline: stored %d chunks for page %s", len(embedded), pid)
68
 
src/file_agent/pipeline.py CHANGED
@@ -1,10 +1,13 @@
1
  from __future__ import annotations
2
 
 
3
  import logging
4
  from pathlib import Path
5
 
 
6
  from ingestion.pipeline.embedder import embed_chunks
7
  from ingestion.pipeline.pii_masker import mask_chunks
 
8
  from ingestion.storage.qdrant_store import delete_chunks_for_doc, upsert_chunks
9
  from src.file_agent.chunker import chunk_file_content
10
  from src.file_agent.config import file_config
@@ -16,11 +19,15 @@ logger = logging.getLogger(__name__)
16
  _SUPPORTED_FORMATS = {"pdf", "docx", "xml", "text", "csv", "xlsx", "html"}
17
 
18
 
 
 
 
 
 
 
 
 
19
  def process_file(file_path: str, team_id: str = "") -> int:
20
- """
21
- Full pipeline: detect → parse → chunk → PII mask → embed → upsert Qdrant.
22
- Returns the number of chunks stored. Raises on fatal errors.
23
- """
24
  team_id = team_id or file_config.team_id
25
  fmt = detect_format(file_path)
26
 
@@ -44,11 +51,18 @@ def process_file(file_path: str, team_id: str = "") -> int:
44
  chunk.text = m
45
 
46
  embedded = embed_chunks(chunks)
 
 
 
 
 
 
 
 
 
 
 
 
47
 
48
- # Idempotent: all chunks share the same doc_id derived from file name
49
- doc_id = embedded[0].doc_id
50
- delete_chunks_for_doc(doc_id)
51
- upsert_chunks(embedded)
52
-
53
- logger.info("file_pipeline: stored %d chunks for %s (format=%s)", len(embedded), Path(file_path).name, fmt)
54
  return len(embedded)
 
1
  from __future__ import annotations
2
 
3
+ import hashlib
4
  import logging
5
  from pathlib import Path
6
 
7
+ from ingestion.models import RawDocument
8
  from ingestion.pipeline.embedder import embed_chunks
9
  from ingestion.pipeline.pii_masker import mask_chunks
10
+ from ingestion.storage import supabase_store
11
  from ingestion.storage.qdrant_store import delete_chunks_for_doc, upsert_chunks
12
  from src.file_agent.chunker import chunk_file_content
13
  from src.file_agent.config import file_config
 
19
  _SUPPORTED_FORMATS = {"pdf", "docx", "xml", "text", "csv", "xlsx", "html"}
20
 
21
 
22
+ def _store(raw_doc, embedded):
23
+ supabase_store.upsert_document(raw_doc)
24
+ supabase_store.delete_chunks_for_doc(raw_doc.doc_id)
25
+ supabase_store.upsert_chunks(embedded)
26
+ delete_chunks_for_doc(raw_doc.doc_id)
27
+ upsert_chunks(embedded)
28
+
29
+
30
  def process_file(file_path: str, team_id: str = "") -> int:
 
 
 
 
31
  team_id = team_id or file_config.team_id
32
  fmt = detect_format(file_path)
33
 
 
51
  chunk.text = m
52
 
53
  embedded = embed_chunks(chunks)
54
+ file_name = Path(file_path).name
55
+ doc_id = hashlib.sha256(f"file:{file_name}".encode()).hexdigest()
56
+ raw_doc = RawDocument(
57
+ doc_id=doc_id,
58
+ title=file_name,
59
+ content="",
60
+ source_url=f"file://{Path(file_path).resolve().as_posix()}",
61
+ source_type="file",
62
+ team_id=team_id,
63
+ metadata={"file_name": file_name, "format": fmt},
64
+ )
65
+ _store(raw_doc, embedded)
66
 
67
+ logger.info("file_pipeline: stored %d chunks for %s (format=%s)", len(embedded), file_name, fmt)
 
 
 
 
 
68
  return len(embedded)
src/jira_agent/pipeline.py CHANGED
@@ -4,6 +4,7 @@ import logging
4
 
5
  from ingestion.pipeline.embedder import embed_chunks
6
  from ingestion.pipeline.pii_masker import mask_chunks
 
7
  from ingestion.storage.qdrant_store import delete_chunks_for_doc, upsert_chunks
8
  from src.jira_agent.adapter import JiraAdapter
9
  from src.jira_agent.chunker import chunk_jira_issue
@@ -12,11 +13,15 @@ from src.jira_agent.config import jira_config
12
  logger = logging.getLogger(__name__)
13
 
14
 
 
 
 
 
 
 
 
 
15
  async def ingest_issue(issue_key: str, team_id: str = "") -> int:
16
- """
17
- Full pipeline for a single Jira issue.
18
- Returns the number of chunks stored.
19
- """
20
  team_id = team_id or jira_config.team_id
21
  adapter = JiraAdapter(team_id=team_id)
22
 
@@ -36,17 +41,13 @@ async def ingest_issue(issue_key: str, team_id: str = "") -> int:
36
  chunk.text = masked
37
 
38
  embedded = embed_chunks(chunks)
39
-
40
- # Idempotent: remove old vectors before upserting new ones
41
- delete_chunks_for_doc(raw_doc.doc_id)
42
- upsert_chunks(embedded)
43
 
44
  logger.info("jira_pipeline: stored %d chunks for %s", len(embedded), issue_key)
45
  return len(embedded)
46
 
47
 
48
  async def ingest_project(project_key: str, team_id: str = "") -> int:
49
- """Full sync of all issues in a project. Returns total chunks stored."""
50
  team_id = team_id or jira_config.team_id
51
  adapter = JiraAdapter(team_id=team_id)
52
  docs = await adapter.fetch_all(project_key)
@@ -61,8 +62,7 @@ async def ingest_project(project_key: str, team_id: str = "") -> int:
61
  for chunk, masked in zip(chunks, masked_texts):
62
  chunk.text = masked
63
  embedded = embed_chunks(chunks)
64
- delete_chunks_for_doc(raw_doc.doc_id)
65
- upsert_chunks(embedded)
66
  total += len(embedded)
67
  logger.info("jira_pipeline: stored %d chunks for %s", len(embedded), key)
68
 
 
4
 
5
  from ingestion.pipeline.embedder import embed_chunks
6
  from ingestion.pipeline.pii_masker import mask_chunks
7
+ from ingestion.storage import supabase_store
8
  from ingestion.storage.qdrant_store import delete_chunks_for_doc, upsert_chunks
9
  from src.jira_agent.adapter import JiraAdapter
10
  from src.jira_agent.chunker import chunk_jira_issue
 
13
  logger = logging.getLogger(__name__)
14
 
15
 
16
+ def _store(raw_doc, embedded):
17
+ supabase_store.upsert_document(raw_doc)
18
+ supabase_store.delete_chunks_for_doc(raw_doc.doc_id)
19
+ supabase_store.upsert_chunks(embedded)
20
+ delete_chunks_for_doc(raw_doc.doc_id)
21
+ upsert_chunks(embedded)
22
+
23
+
24
  async def ingest_issue(issue_key: str, team_id: str = "") -> int:
 
 
 
 
25
  team_id = team_id or jira_config.team_id
26
  adapter = JiraAdapter(team_id=team_id)
27
 
 
41
  chunk.text = masked
42
 
43
  embedded = embed_chunks(chunks)
44
+ _store(raw_doc, embedded)
 
 
 
45
 
46
  logger.info("jira_pipeline: stored %d chunks for %s", len(embedded), issue_key)
47
  return len(embedded)
48
 
49
 
50
  async def ingest_project(project_key: str, team_id: str = "") -> int:
 
51
  team_id = team_id or jira_config.team_id
52
  adapter = JiraAdapter(team_id=team_id)
53
  docs = await adapter.fetch_all(project_key)
 
62
  for chunk, masked in zip(chunks, masked_texts):
63
  chunk.text = masked
64
  embedded = embed_chunks(chunks)
65
+ _store(raw_doc, embedded)
 
66
  total += len(embedded)
67
  logger.info("jira_pipeline: stored %d chunks for %s", len(embedded), key)
68