Socrates_docker / db7_utils.py
alesamodio's picture
remove fet_user_id and get_username
0b170f9
from typing import List, Dict, Any
from datetime import datetime, timedelta
from langchain_community.vectorstores import FAISS
from langchain_core.documents import Document
from langchain_community.embeddings import HuggingFaceEmbeddings
from config import HF_EMBEDDING_MODEL, GNEWS_KEY
from db3_utils import query_llm, translate_term, fetch_search
from db6_utils import fetch_full_article
from supabase_ie import save_faiss_to_supabase, download_faiss_from_supabase
import urllib.parse
import time
embeddings = HuggingFaceEmbeddings(
model_name=HF_EMBEDDING_MODEL,
encode_kwargs={"normalize_embeddings": True},
)
# # --- Step 1: Extract keywords ---
# def extract_keywords_from_query(query: str, max_terms: int = 3) -> List[str]:
# """
# Use LLM to extract keywords from a free-form query.
# """
# prompt = (
# f"Extract up to {max_terms} concise keywords or short phrases from this query for a news search:\n\n{query}\n\n"
# "Return as a comma-separated list."
# )
# result = query_llm(prompt)
# return [kw.strip() for kw in result.split(",") if kw.strip()]
# --- Step 2: Translate keywords ---
# def translate_keywords(keywords: List[str], langs: List[str]) -> Dict[str, Dict[str, str]]:
# """
# Map keywords into different languages.
# Returns {keyword: {lang: translated_term}}
# """
# translated_map: Dict[str, Dict[str, str]] = {}
# for kw in keywords:
# translated_map[kw] = {}
# for lang in langs:
# if lang == "en":
# translated_map[kw][lang] = kw
# else:
# translated_map[kw][lang] = translate_term(kw, lang)
# return translated_map
# --- Step 3: Fetch articles from GNews ---
# def debug_fetch():
# """
# Temporary debug function to test GNews queries with different parameter tweaks.
# """
# test_variants = [
# {"label": "original", "q": "Sinner AND Beijing ATP tournament AND performance", "use_dates": True, "country": None},
# {"label": "no_dates", "q": "Sinner AND Beijing ATP tournament AND performance", "use_dates": False, "country": None},
# {"label": "simple_keyword", "q": "sport", "use_dates": True, "country": None},
# {"label": "with_country", "q": "sport", "use_dates": True, "country": "us"},
# {"label": "sort_relevance", "q": "sport", "use_dates": True, "country": None},
# ]
# now = datetime.utcnow().replace(microsecond=0)
# date_from = (now - timedelta(days=3)).isoformat() + "Z"
# date_to = now.isoformat() + "Z"
# results = {}
# for variant in test_variants:
# print(f"\n[DEBUG FETCH] Running variant: {variant['label']}")
# try:
# rows = fetch_search(
# api_key=None,
# bucket="db7",
# q=variant["q"],
# lang="en",
# country=variant["country"],
# max_items=10,
# #pages=1,
# #sort_by=variant["sort_by"],
# date_from=None, #date_from if variant["use_dates"] else None,
# date_to=None, #date_to if variant["use_dates"] else None,
# )
# print(f" ➡️ Found {len(rows)} articles")
# results[variant["label"]] = len(rows)
# except Exception as e:
# print(f"⚠️ Error in {variant['label']} fetch: {e}")
# results[variant["label"]] = "error"
# return results
# temporary change
# def fetch_articles_for_keywords(
# news_topic: Dict[str, Dict[str, str]],
# lookback_days: int = 3,
# max_items: int = 50,
# min_results: int = 5
# ) -> List[Dict[str, Any]]:
# """
# Fetch articles for each keyword/lang pair using GNews.
# Strategy:
# 1. Try an AND query (all keywords together).
# 2. If too few results, fall back to OR query.
# Debug:
# - Print final date_from/date_to strings.
# - Print queries before calling fetch_search.
# - Print full URL params if request fails.
# """
# results = []
# now = datetime.utcnow().replace(microsecond=0)
# date_from = (now - timedelta(days=lookback_days)).isoformat() + "Z"
# date_to = now.isoformat() + "Z"
# print(f"[DB7][DEBUG] date_from={date_from}, date_to={date_to}")
# keywords = list(news_topic.keys())
# for lang in set(l for langs in news_topic.values() for l in langs.keys()):
# terms = [news_topic[kw].get(lang, kw) for kw in keywords if news_topic[kw].get(lang)]
# if not terms:
# continue
# # --- AND query ---
# and_query = " AND ".join(terms)
# print(f"[DB7][DEBUG] Trying AND query for lang={lang}: {and_query}")
# try:
# rows = fetch_search(
# api_key=GNEWS_KEY,
# bucket="db7",
# q=and_query,
# lang=lang,
# country=None,
# max_items=max_items,
# pages=1,
# sort_by="popularity",
# date_from=date_from,
# date_to=date_to,
# )
# print(f" ➡️ Found {len(rows)} articles (AND)")
# results.extend(rows)
# time.sleep(1)
# except Exception as e:
# print(f"⚠️ Error in AND fetch for lang={lang}, query='{and_query}'")
# print(f" date_from={date_from}, date_to={date_to}")
# print(f" Exception: {e}")
# rows = []
# # --- OR fallback ---
# if len(rows) < min_results:
# or_query = " OR ".join(terms)
# print(f"[DB7][DEBUG] Trying OR query for lang={lang}: {or_query}")
# try:
# rows_or = fetch_search(
# api_key=GNEWS_KEY,
# bucket="db7",
# q=or_query,
# lang=lang,
# country=None,
# max_items=max_items,
# pages=1,
# sort_by="relevance",
# date_from=date_from,
# date_to=date_to,
# )
# print(f" ➡️ Found {len(rows_or)} articles (OR fallback)")
# results.extend(rows_or)
# time.sleep(1)
# except Exception as e:
# print(f"⚠️ Error in OR fetch for lang={lang}, query='{or_query}'")
# print(f" date_from={date_from}, date_to={date_to}")
# print(f" Exception: {e}")
# return results
def fetch_articles_for_keywords(
news_topic: list[str],
lookback_days: int = 3,
max_items: int = 50,
min_results: int = 5,
user_countries: list[str] | None = None,
user_lang: list[str] | None = None
) -> List[Dict[str, Any]]:
"""
Fetch articles for a list of keywords in multiple languages and countries.
Steps:
1. Translate news_topic into each user_lang (if not "en").
2. Perform fetches for each (topic_lang, country, lang).
3. Always also fetch global English (country=None, lang="en").
4. Translate found titles+descriptions back into English.
"""
results: List[Dict[str, Any]] = []
now = datetime.utcnow().replace(microsecond=0)
date_from = (now - timedelta(days=lookback_days)).isoformat() + "Z"
date_to = now.isoformat() + "Z"
print(f"[DB7][DEBUG] date_from={date_from}, date_to={date_to}")
if not news_topic:
print("⚠️ No keywords provided")
return results
# Ensure we always include English
langs = set(user_lang or [])
langs.add("en")
# --- Loop over each user language + country ---
for lang in langs:
# Translate topic terms if needed
if lang == "en":
translated_terms = news_topic
else:
translated_terms = [translate_term(term, lang) for term in news_topic]
# Build AND and OR queries
and_query = " AND ".join(translated_terms)
or_query = " OR ".join(translated_terms)
countries_to_check = user_countries if user_countries else [None]
for country in countries_to_check:
# --- AND query first ---
try:
print(f"[DB7][DEBUG] Trying AND query lang={lang}, country={country}: {and_query}")
rows = fetch_search(
api_key=GNEWS_KEY,
bucket="db7",
q=and_query,
lang=lang,
country=country,
max_items=max_items,
pages=1,
sort_by="relevance",
date_from=date_from,
date_to=date_to,
)
print(f" ➡️ Found {len(rows)} articles (AND)")
results.extend(rows)
time.sleep(1)
except Exception as e:
print(f"⚠️ Error in AND fetch for lang={lang}, country={country}: {e}")
rows = []
# --- OR fallback ---
if len(rows) < min_results:
try:
print(f"[DB7][DEBUG] Trying OR query lang={lang}, country={country}: {or_query}")
rows_or = fetch_search(
api_key=GNEWS_KEY,
bucket="db7",
q=or_query,
lang=lang,
country=country,
max_items=max_items,
pages=1,
sort_by="relevance",
date_from=date_from,
date_to=date_to,
)
print(f" ➡️ Found {len(rows_or)} articles (OR fallback)")
results.extend(rows_or)
time.sleep(1)
except Exception as e:
print(f"⚠️ Error in OR fetch for lang={lang}, country={country}: {e}")
# --- Translate fetched articles into English ---
translated_results = []
for a in results:
try:
title_en = query_llm(f"Translate into English (short, precise):\n\n{a.get('title','')}")
desc_en = query_llm(f"Translate into English (short, precise):\n\n{a.get('description','')}")
except Exception as e:
print(f"⚠️ Translation failed for {a.get('url')}: {e}")
title_en, desc_en = a.get("title",""), a.get("description","")
a["title_en"] = title_en
a["description_en"] = desc_en
translated_results.append(a)
return translated_results
# --- Step 4: Embed + rank ---
def embed_and_rank_articles(articles: List[Dict[str, Any]], query: str):
docs = []
for a in articles:
content = (a.get("title") or "") + " " + (a.get("description") or "")
docs.append(Document(
page_content=content,
metadata={
"url": a.get("url"),
"title": a.get("title"),
"date": a.get("published_at"),
"source": (a.get("source") or {}).get("name", "")
}
))
vectorstore = FAISS.from_documents(docs, embeddings)
docs_and_scores = vectorstore.similarity_search_with_score(query, k=len(docs))
ranked = []
for d, score in docs_and_scores:
ranked.append({
"title": d.metadata.get("title", ""),
"url": d.metadata.get("url", ""),
"date": d.metadata.get("date", ""),
"source": d.metadata.get("source", ""),
"snippet": d.page_content[:300],
"score": float(score),
})
return vectorstore, ranked
# --- Step 5: Fetch top full articles ---
def fetch_top_full_articles(ranked: List[Dict[str, Any]], top_n: int = 2):
selected = []
for r in ranked[:top_n]:
full_text = fetch_full_article(r["url"])
if not full_text:
continue
selected.append({
"title": r["title"],
"url": r["url"],
"date": r["date"],
"source": r["source"],
"full_text": full_text,
"score": r["score"],
})
return selected
#build faiss from the full text articles save it in sb7
def build_faiss_from_full_articles(full_articles: List[Dict[str, Any]]) -> FAISS:
"""
Build an in-memory FAISS index from full text articles (db7).
Each document stores the full_text in page_content and metadata with URL etc.
"""
docs = []
for a in full_articles:
docs.append(Document(
page_content=a["full_text"],
metadata={
"url": a.get("url"),
"title": a.get("title"),
"date": a.get("date"),
"source": a.get("source"),
"score": a.get("score"),
}
))
return FAISS.from_documents(docs, embeddings)
#merge it with db6
def merge_db7_into_db6(full_articles: List[Dict[str, Any]], username: str):
"""
Merge FAISS index built from db7 full text articles into db6 FAISS in Supabase.
"""
# 1. Build FAISS from the new full articles
new_db = build_faiss_from_full_articles(full_articles)
# 2. Download existing db6 FAISS from Supabase
try:
tmp_dir = download_faiss_from_supabase("db6", username=username)
existing_db = FAISS.load_local(tmp_dir, embeddings, allow_dangerous_deserialization=True)
existing_db.merge_from(new_db)
save_faiss_to_supabase(existing_db, db_key="db6", username=username)
print(f"✅ Merged {len(full_articles)} db7 full-text articles into db6 FAISS")
except FileNotFoundError:
# If db6 doesn’t exist yet, just create it
save_faiss_to_supabase(new_db, db_key="db6", username=username)
print(f"✅ Created new db6 FAISS from {len(full_articles)} db7 full-text articles")
return existing_db if 'existing' in locals() else new_db