from typing import List, Dict, Any from datetime import datetime, timedelta from langchain_community.vectorstores import FAISS from langchain_core.documents import Document from langchain_community.embeddings import HuggingFaceEmbeddings from config import HF_EMBEDDING_MODEL, GNEWS_KEY from db3_utils import query_llm, translate_term, fetch_search from db6_utils import fetch_full_article from supabase_ie import save_faiss_to_supabase, download_faiss_from_supabase import urllib.parse import time embeddings = HuggingFaceEmbeddings( model_name=HF_EMBEDDING_MODEL, encode_kwargs={"normalize_embeddings": True}, ) # # --- Step 1: Extract keywords --- # def extract_keywords_from_query(query: str, max_terms: int = 3) -> List[str]: # """ # Use LLM to extract keywords from a free-form query. # """ # prompt = ( # f"Extract up to {max_terms} concise keywords or short phrases from this query for a news search:\n\n{query}\n\n" # "Return as a comma-separated list." # ) # result = query_llm(prompt) # return [kw.strip() for kw in result.split(",") if kw.strip()] # --- Step 2: Translate keywords --- # def translate_keywords(keywords: List[str], langs: List[str]) -> Dict[str, Dict[str, str]]: # """ # Map keywords into different languages. # Returns {keyword: {lang: translated_term}} # """ # translated_map: Dict[str, Dict[str, str]] = {} # for kw in keywords: # translated_map[kw] = {} # for lang in langs: # if lang == "en": # translated_map[kw][lang] = kw # else: # translated_map[kw][lang] = translate_term(kw, lang) # return translated_map # --- Step 3: Fetch articles from GNews --- # def debug_fetch(): # """ # Temporary debug function to test GNews queries with different parameter tweaks. # """ # test_variants = [ # {"label": "original", "q": "Sinner AND Beijing ATP tournament AND performance", "use_dates": True, "country": None}, # {"label": "no_dates", "q": "Sinner AND Beijing ATP tournament AND performance", "use_dates": False, "country": None}, # {"label": "simple_keyword", "q": "sport", "use_dates": True, "country": None}, # {"label": "with_country", "q": "sport", "use_dates": True, "country": "us"}, # {"label": "sort_relevance", "q": "sport", "use_dates": True, "country": None}, # ] # now = datetime.utcnow().replace(microsecond=0) # date_from = (now - timedelta(days=3)).isoformat() + "Z" # date_to = now.isoformat() + "Z" # results = {} # for variant in test_variants: # print(f"\n[DEBUG FETCH] Running variant: {variant['label']}") # try: # rows = fetch_search( # api_key=None, # bucket="db7", # q=variant["q"], # lang="en", # country=variant["country"], # max_items=10, # #pages=1, # #sort_by=variant["sort_by"], # date_from=None, #date_from if variant["use_dates"] else None, # date_to=None, #date_to if variant["use_dates"] else None, # ) # print(f" ➡️ Found {len(rows)} articles") # results[variant["label"]] = len(rows) # except Exception as e: # print(f"⚠️ Error in {variant['label']} fetch: {e}") # results[variant["label"]] = "error" # return results # temporary change # def fetch_articles_for_keywords( # news_topic: Dict[str, Dict[str, str]], # lookback_days: int = 3, # max_items: int = 50, # min_results: int = 5 # ) -> List[Dict[str, Any]]: # """ # Fetch articles for each keyword/lang pair using GNews. # Strategy: # 1. Try an AND query (all keywords together). # 2. If too few results, fall back to OR query. # Debug: # - Print final date_from/date_to strings. # - Print queries before calling fetch_search. # - Print full URL params if request fails. # """ # results = [] # now = datetime.utcnow().replace(microsecond=0) # date_from = (now - timedelta(days=lookback_days)).isoformat() + "Z" # date_to = now.isoformat() + "Z" # print(f"[DB7][DEBUG] date_from={date_from}, date_to={date_to}") # keywords = list(news_topic.keys()) # for lang in set(l for langs in news_topic.values() for l in langs.keys()): # terms = [news_topic[kw].get(lang, kw) for kw in keywords if news_topic[kw].get(lang)] # if not terms: # continue # # --- AND query --- # and_query = " AND ".join(terms) # print(f"[DB7][DEBUG] Trying AND query for lang={lang}: {and_query}") # try: # rows = fetch_search( # api_key=GNEWS_KEY, # bucket="db7", # q=and_query, # lang=lang, # country=None, # max_items=max_items, # pages=1, # sort_by="popularity", # date_from=date_from, # date_to=date_to, # ) # print(f" ➡️ Found {len(rows)} articles (AND)") # results.extend(rows) # time.sleep(1) # except Exception as e: # print(f"⚠️ Error in AND fetch for lang={lang}, query='{and_query}'") # print(f" date_from={date_from}, date_to={date_to}") # print(f" Exception: {e}") # rows = [] # # --- OR fallback --- # if len(rows) < min_results: # or_query = " OR ".join(terms) # print(f"[DB7][DEBUG] Trying OR query for lang={lang}: {or_query}") # try: # rows_or = fetch_search( # api_key=GNEWS_KEY, # bucket="db7", # q=or_query, # lang=lang, # country=None, # max_items=max_items, # pages=1, # sort_by="relevance", # date_from=date_from, # date_to=date_to, # ) # print(f" ➡️ Found {len(rows_or)} articles (OR fallback)") # results.extend(rows_or) # time.sleep(1) # except Exception as e: # print(f"⚠️ Error in OR fetch for lang={lang}, query='{or_query}'") # print(f" date_from={date_from}, date_to={date_to}") # print(f" Exception: {e}") # return results def fetch_articles_for_keywords( news_topic: list[str], lookback_days: int = 3, max_items: int = 50, min_results: int = 5, user_countries: list[str] | None = None, user_lang: list[str] | None = None ) -> List[Dict[str, Any]]: """ Fetch articles for a list of keywords in multiple languages and countries. Steps: 1. Translate news_topic into each user_lang (if not "en"). 2. Perform fetches for each (topic_lang, country, lang). 3. Always also fetch global English (country=None, lang="en"). 4. Translate found titles+descriptions back into English. """ results: List[Dict[str, Any]] = [] now = datetime.utcnow().replace(microsecond=0) date_from = (now - timedelta(days=lookback_days)).isoformat() + "Z" date_to = now.isoformat() + "Z" print(f"[DB7][DEBUG] date_from={date_from}, date_to={date_to}") if not news_topic: print("⚠️ No keywords provided") return results # Ensure we always include English langs = set(user_lang or []) langs.add("en") # --- Loop over each user language + country --- for lang in langs: # Translate topic terms if needed if lang == "en": translated_terms = news_topic else: translated_terms = [translate_term(term, lang) for term in news_topic] # Build AND and OR queries and_query = " AND ".join(translated_terms) or_query = " OR ".join(translated_terms) countries_to_check = user_countries if user_countries else [None] for country in countries_to_check: # --- AND query first --- try: print(f"[DB7][DEBUG] Trying AND query lang={lang}, country={country}: {and_query}") rows = fetch_search( api_key=GNEWS_KEY, bucket="db7", q=and_query, lang=lang, country=country, max_items=max_items, pages=1, sort_by="relevance", date_from=date_from, date_to=date_to, ) print(f" ➡️ Found {len(rows)} articles (AND)") results.extend(rows) time.sleep(1) except Exception as e: print(f"⚠️ Error in AND fetch for lang={lang}, country={country}: {e}") rows = [] # --- OR fallback --- if len(rows) < min_results: try: print(f"[DB7][DEBUG] Trying OR query lang={lang}, country={country}: {or_query}") rows_or = fetch_search( api_key=GNEWS_KEY, bucket="db7", q=or_query, lang=lang, country=country, max_items=max_items, pages=1, sort_by="relevance", date_from=date_from, date_to=date_to, ) print(f" ➡️ Found {len(rows_or)} articles (OR fallback)") results.extend(rows_or) time.sleep(1) except Exception as e: print(f"⚠️ Error in OR fetch for lang={lang}, country={country}: {e}") # --- Translate fetched articles into English --- translated_results = [] for a in results: try: title_en = query_llm(f"Translate into English (short, precise):\n\n{a.get('title','')}") desc_en = query_llm(f"Translate into English (short, precise):\n\n{a.get('description','')}") except Exception as e: print(f"⚠️ Translation failed for {a.get('url')}: {e}") title_en, desc_en = a.get("title",""), a.get("description","") a["title_en"] = title_en a["description_en"] = desc_en translated_results.append(a) return translated_results # --- Step 4: Embed + rank --- def embed_and_rank_articles(articles: List[Dict[str, Any]], query: str): docs = [] for a in articles: content = (a.get("title") or "") + " " + (a.get("description") or "") docs.append(Document( page_content=content, metadata={ "url": a.get("url"), "title": a.get("title"), "date": a.get("published_at"), "source": (a.get("source") or {}).get("name", "") } )) vectorstore = FAISS.from_documents(docs, embeddings) docs_and_scores = vectorstore.similarity_search_with_score(query, k=len(docs)) ranked = [] for d, score in docs_and_scores: ranked.append({ "title": d.metadata.get("title", ""), "url": d.metadata.get("url", ""), "date": d.metadata.get("date", ""), "source": d.metadata.get("source", ""), "snippet": d.page_content[:300], "score": float(score), }) return vectorstore, ranked # --- Step 5: Fetch top full articles --- def fetch_top_full_articles(ranked: List[Dict[str, Any]], top_n: int = 2): selected = [] for r in ranked[:top_n]: full_text = fetch_full_article(r["url"]) if not full_text: continue selected.append({ "title": r["title"], "url": r["url"], "date": r["date"], "source": r["source"], "full_text": full_text, "score": r["score"], }) return selected #build faiss from the full text articles save it in sb7 def build_faiss_from_full_articles(full_articles: List[Dict[str, Any]]) -> FAISS: """ Build an in-memory FAISS index from full text articles (db7). Each document stores the full_text in page_content and metadata with URL etc. """ docs = [] for a in full_articles: docs.append(Document( page_content=a["full_text"], metadata={ "url": a.get("url"), "title": a.get("title"), "date": a.get("date"), "source": a.get("source"), "score": a.get("score"), } )) return FAISS.from_documents(docs, embeddings) #merge it with db6 def merge_db7_into_db6(full_articles: List[Dict[str, Any]], username: str): """ Merge FAISS index built from db7 full text articles into db6 FAISS in Supabase. """ # 1. Build FAISS from the new full articles new_db = build_faiss_from_full_articles(full_articles) # 2. Download existing db6 FAISS from Supabase try: tmp_dir = download_faiss_from_supabase("db6", username=username) existing_db = FAISS.load_local(tmp_dir, embeddings, allow_dangerous_deserialization=True) existing_db.merge_from(new_db) save_faiss_to_supabase(existing_db, db_key="db6", username=username) print(f"✅ Merged {len(full_articles)} db7 full-text articles into db6 FAISS") except FileNotFoundError: # If db6 doesn’t exist yet, just create it save_faiss_to_supabase(new_db, db_key="db6", username=username) print(f"✅ Created new db6 FAISS from {len(full_articles)} db7 full-text articles") return existing_db if 'existing' in locals() else new_db