not-google / api /search.py
sae8d's picture
Upload search.py
5916ef3 verified
from db.supabase_client import get_supabase
from indexer.preprocess import preprocess
from ddgs import DDGS
def fetch_ddgs_results(query: str, max_results: int = 5):
"""Fetch results and images from DuckDuckGo using ddgs."""
results = []
images = []
# Handle wildcard expansion for DDGS
ddg_query = query
if '*' in query or '?' in query:
try:
from db.supabase_client import get_supabase
supabase = get_supabase()
sql_pattern = query.replace('*', '%').replace('?', '_')
terms_resp = supabase.table("inverted_index") \
.select("term") \
.like("term", sql_pattern) \
.limit(5) \
.execute()
matching_terms = list(set(t['term'] for t in terms_resp.data))
if matching_terms:
# Use OR to search for matching terms
ddg_query = " OR ".join(matching_terms)
except Exception as e:
print(f"Error expanding wildcard for DDGS: {e}")
try:
with DDGS() as ddgs:
# Fetch text results
ddgs_gen = ddgs.text(ddg_query, max_results=max_results, safesearch='on')
results = list(ddgs_gen)
# Fetch image results
ddgs_images_gen = ddgs.images(ddg_query, max_results=max_results, safesearch='on')
images = list(ddgs_images_gen)
except Exception as e:
print(f"Error fetching DDGS results: {e}")
return results, images
def rank_search(query: str, k: int = 10):
supabase = get_supabase()
query_terms = preprocess(query)
if not query_terms:
return []
# Join inverted_index with term_stats to get TF and IDF
# We want to aggregate sum(tf * idf) per doc_id
# Since supabase-py doesn't support complex joins/aggregations easily via the builder,
# we can use a stored procedure (RPC) or do it in Python if the dataset is small.
# For a project setting, fetching the terms and aggregating in Python is acceptable,
# but a single SQL query is better.
# 1. Fetch inverted index entries and document info
# inverted_index has a FK to documents, so this join works
response = supabase.table("inverted_index") \
.select("term, doc_id, tf, documents(url, title, image_url)") \
.in_("term", query_terms) \
.execute()
if not response.data:
return []
# 2. Fetch IDF values from term_stats separately to avoid join errors
stats_response = supabase.table("term_stats") \
.select("term, idf") \
.in_("term", query_terms) \
.execute()
term_to_idf = {row['term']: row['idf'] for row in stats_response.data}
scores = {}
doc_info = {}
for row in response.data:
doc_id = row['doc_id']
term = row['term']
tf = row['tf']
idf = term_to_idf.get(term, 0)
score = tf * idf
scores[doc_id] = scores.get(doc_id, 0) + score
if doc_id not in doc_info:
# Handle both dict and list response from Supabase join
docs = row.get('documents')
if isinstance(docs, list):
docs = docs[0] if docs else None
if docs:
doc_info[doc_id] = {
"url": docs.get('url', ''),
"title": docs.get('title', 'Untitled'),
"image_url": docs.get('image_url')
}
else:
doc_info[doc_id] = {
"url": "#",
"title": "Missing Document Info",
"image_url": None
}
# Sort by score descending
sorted_results = sorted(scores.items(), key=lambda x: x[1], reverse=True)[:k*2] # Fetch more to allow for filtering
results = []
for rank, (doc_id, score) in enumerate(sorted_results, 1):
if score < 0.01:
continue
results.append({
"rank": rank,
"doc_id": doc_id,
"url": doc_info[doc_id]["url"],
"title": doc_info[doc_id]["title"],
"image_url": doc_info[doc_id]["image_url"],
"score": round(score, 4)
})
return results
def wildcard_search(query: str, k: int = 10):
supabase = get_supabase()
# Map * to % and ? to _
sql_pattern = query.replace('*', '%').replace('?', '_')
# 1. Find matching terms in inverted_index using LIKE
# This hits the GIN trigram index
terms_response = supabase.table("inverted_index") \
.select("term") \
.like("term", sql_pattern) \
.execute()
matching_terms = list(set(t['term'] for t in terms_response.data))
if not matching_terms:
return []
# 2. Perform regular ranking with these matching terms
# Fetch inverted index entries and document info
response = supabase.table("inverted_index") \
.select("term, doc_id, tf, documents(url, title, image_url)") \
.in_("term", matching_terms) \
.execute()
if not response.data:
return []
# 3. Fetch IDF values from term_stats separately
stats_response = supabase.table("term_stats") \
.select("term, idf") \
.in_("term", matching_terms) \
.execute()
term_to_idf = {row['term']: row['idf'] for row in stats_response.data}
scores = {}
doc_info = {}
for row in response.data:
doc_id = row['doc_id']
term = row['term']
tf = row['tf']
idf = term_to_idf.get(term, 0)
score = tf * idf
scores[doc_id] = scores.get(doc_id, 0) + score
if doc_id not in doc_info:
# Handle both dict and list response from Supabase join
docs = row.get('documents')
if isinstance(docs, list):
docs = docs[0] if docs else None
if docs:
doc_info[doc_id] = {
"url": docs.get('url', ''),
"title": docs.get('title', 'Untitled'),
"image_url": docs.get('image_url')
}
else:
doc_info[doc_id] = {
"url": "#",
"title": "Missing Document Info",
"image_url": None
}
sorted_results = sorted(scores.items(), key=lambda x: x[1], reverse=True)[:k*2] # Fetch more to allow for filtering
results = []
for rank, (doc_id, score) in enumerate(sorted_results, 1):
if score < 0.01:
continue
results.append({
"rank": rank,
"doc_id": doc_id,
"url": doc_info[doc_id]["url"],
"title": doc_info[doc_id]["title"],
"image_url": doc_info[doc_id]["image_url"],
"score": round(score, 4)
})
return results