Spaces:
Sleeping
Sleeping
File size: 6,926 Bytes
266d7bc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
import opik
from fastapi import Request
from qdrant_client.models import (
FieldCondition,
Filter,
Fusion,
FusionQuery,
MatchText,
MatchValue,
Prefetch,
)
from src.api.models.api_models import SearchResult
from src.infrastructure.qdrant.qdrant_vectorstore import AsyncQdrantVectorStore
from src.utils.logger_util import setup_logging
logger = setup_logging()
@opik.track(name="query_with_filters")
async def query_with_filters(
request: Request,
query_text: str = "",
feed_author: str | None = None,
feed_name: str | None = None,
title_keywords: str | None = None,
limit: int = 5,
) -> list[SearchResult]:
"""Query the vector store with optional filters and return search results.
Performs a hybrid dense + sparse search on Qdrant and applies filters based
on feed author, feed name, and title keywords. Results are deduplicated by point ID.
Args:
request (Request): FastAPI request object containing the vector store in app.state.
query_text (str): Text query to search for.
feed_author (str | None): Optional filter for the feed author.
feed_name (str | None): Optional filter for the feed name.
title_keywords (str | None): Optional filter for title keywords.
limit (int): Maximum number of results to return.
Returns:
list[SearchResult]:
List of search results containing title, feed info, URL, chunk text, and score.
"""
vectorstore: AsyncQdrantVectorStore = request.app.state.vectorstore
dense_vector = vectorstore.dense_vectors([query_text])[0]
sparse_vector = vectorstore.sparse_vectors([query_text])[0]
# Build filter conditions
conditions: list[FieldCondition] = []
if feed_author:
conditions.append(FieldCondition(key="feed_author", match=MatchValue(value=feed_author)))
if feed_name:
conditions.append(FieldCondition(key="feed_name", match=MatchValue(value=feed_name)))
if title_keywords:
conditions.append(
FieldCondition(key="title", match=MatchText(text=title_keywords.strip().lower()))
)
query_filter = Filter(must=conditions) if conditions else None # type: ignore
fetch_limit = max(1, limit) * 100
logger.info(f"Fetching up to {fetch_limit} points for unique Ids.")
response = await vectorstore.client.query_points(
collection_name=vectorstore.collection_name,
query=FusionQuery(fusion=Fusion.RRF),
prefetch=[
Prefetch(query=dense_vector, using="Dense", limit=fetch_limit, filter=query_filter),
Prefetch(query=sparse_vector, using="Sparse", limit=fetch_limit, filter=query_filter),
],
query_filter=query_filter,
limit=fetch_limit,
)
# Deduplicate by point ID
seen_ids: set[str] = set()
results: list[SearchResult] = []
for point in response.points:
if point.id in seen_ids:
continue
seen_ids.add(point.id) # type: ignore
payload = point.payload or {}
results.append(
SearchResult(
title=payload.get("title", ""),
feed_author=payload.get("feed_author"),
feed_name=payload.get("feed_name"),
article_author=payload.get("article_authors"),
url=payload.get("url"),
chunk_text=payload.get("chunk_text"),
score=point.score,
)
)
results = results[:limit]
logger.info(f"Returning {len(results)} results for matching query '{query_text}'")
return results
@opik.track(name="query_unique_titles")
async def query_unique_titles(
request: Request,
query_text: str,
feed_author: str | None = None,
feed_name: str | None = None,
title_keywords: str | None = None,
limit: int = 5,
) -> list[SearchResult]:
"""Query the vector store and return only unique titles.
Performs a hybrid dense + sparse search with optional filters and dynamically
increases the fetch limit to account for duplicates. Deduplicates results
by article title.
Args:
request (Request): FastAPI request object containing the vector store in app.state.
query_text (str): Text query to search for.
feed_author (str | None): Optional filter for the feed author.
feed_name (str | None): Optional filter for the feed name.
title_keywords (str | None): Optional filter for title keywords.
limit (int): Maximum number of unique results to return.
Returns:
list[SearchResult]:
List of unique search results containing title, feed info, URL, chunk text, and score.
"""
vectorstore: AsyncQdrantVectorStore = request.app.state.vectorstore
dense_vector = vectorstore.dense_vectors([query_text])[0]
sparse_vector = vectorstore.sparse_vectors([query_text])[0]
# Build filter conditions
conditions: list[FieldCondition] = []
if feed_author:
conditions.append(FieldCondition(key="feed_author", match=MatchValue(value=feed_author)))
if feed_name:
conditions.append(FieldCondition(key="feed_name", match=MatchValue(value=feed_name)))
if title_keywords:
conditions.append(
FieldCondition(key="title", match=MatchText(text=title_keywords.strip().lower()))
)
query_filter = Filter(must=conditions) if conditions else None # type: ignore
fetch_limit = max(1, limit) * 280
logger.info(f"Fetching up to {fetch_limit} points for unique titles.")
response = await vectorstore.client.query_points(
collection_name=vectorstore.collection_name,
query=FusionQuery(fusion=Fusion.RRF),
prefetch=[
Prefetch(query=dense_vector, using="Dense", limit=fetch_limit, filter=query_filter),
Prefetch(query=sparse_vector, using="Sparse", limit=fetch_limit, filter=query_filter),
],
query_filter=query_filter,
limit=fetch_limit,
)
# Deduplicate by title
seen_titles: set[str] = set()
results: list[SearchResult] = []
for point in response.points:
payload = point.payload or {}
title = payload.get("title")
if not title or title in seen_titles:
continue
seen_titles.add(title)
results.append(
SearchResult(
title=title,
feed_author=payload.get("feed_author"),
feed_name=payload.get("feed_name"),
article_author=payload.get("article_authors"),
url=payload.get("url"),
chunk_text=payload.get("chunk_text"),
score=point.score,
)
)
if len(results) >= limit:
break
logger.info(f"Returning {len(results)} unique title results for matching query '{query_text}'")
# logger.info(f"results: {results}")
return results
|