Spaces:
Sleeping
Sleeping
| from typing import List, Dict, Any | |
| from datetime import datetime, timedelta | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_core.documents import Document | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from config import HF_EMBEDDING_MODEL, GNEWS_KEY | |
| from db3_utils import query_llm, translate_term, fetch_search | |
| from db6_utils import fetch_full_article | |
| from supabase_ie import save_faiss_to_supabase, download_faiss_from_supabase | |
| import urllib.parse | |
| import time | |
| embeddings = HuggingFaceEmbeddings( | |
| model_name=HF_EMBEDDING_MODEL, | |
| encode_kwargs={"normalize_embeddings": True}, | |
| ) | |
| # # --- Step 1: Extract keywords --- | |
| # def extract_keywords_from_query(query: str, max_terms: int = 3) -> List[str]: | |
| # """ | |
| # Use LLM to extract keywords from a free-form query. | |
| # """ | |
| # prompt = ( | |
| # f"Extract up to {max_terms} concise keywords or short phrases from this query for a news search:\n\n{query}\n\n" | |
| # "Return as a comma-separated list." | |
| # ) | |
| # result = query_llm(prompt) | |
| # return [kw.strip() for kw in result.split(",") if kw.strip()] | |
| # --- Step 2: Translate keywords --- | |
| # def translate_keywords(keywords: List[str], langs: List[str]) -> Dict[str, Dict[str, str]]: | |
| # """ | |
| # Map keywords into different languages. | |
| # Returns {keyword: {lang: translated_term}} | |
| # """ | |
| # translated_map: Dict[str, Dict[str, str]] = {} | |
| # for kw in keywords: | |
| # translated_map[kw] = {} | |
| # for lang in langs: | |
| # if lang == "en": | |
| # translated_map[kw][lang] = kw | |
| # else: | |
| # translated_map[kw][lang] = translate_term(kw, lang) | |
| # return translated_map | |
| # --- Step 3: Fetch articles from GNews --- | |
| # def debug_fetch(): | |
| # """ | |
| # Temporary debug function to test GNews queries with different parameter tweaks. | |
| # """ | |
| # test_variants = [ | |
| # {"label": "original", "q": "Sinner AND Beijing ATP tournament AND performance", "use_dates": True, "country": None}, | |
| # {"label": "no_dates", "q": "Sinner AND Beijing ATP tournament AND performance", "use_dates": False, "country": None}, | |
| # {"label": "simple_keyword", "q": "sport", "use_dates": True, "country": None}, | |
| # {"label": "with_country", "q": "sport", "use_dates": True, "country": "us"}, | |
| # {"label": "sort_relevance", "q": "sport", "use_dates": True, "country": None}, | |
| # ] | |
| # now = datetime.utcnow().replace(microsecond=0) | |
| # date_from = (now - timedelta(days=3)).isoformat() + "Z" | |
| # date_to = now.isoformat() + "Z" | |
| # results = {} | |
| # for variant in test_variants: | |
| # print(f"\n[DEBUG FETCH] Running variant: {variant['label']}") | |
| # try: | |
| # rows = fetch_search( | |
| # api_key=None, | |
| # bucket="db7", | |
| # q=variant["q"], | |
| # lang="en", | |
| # country=variant["country"], | |
| # max_items=10, | |
| # #pages=1, | |
| # #sort_by=variant["sort_by"], | |
| # date_from=None, #date_from if variant["use_dates"] else None, | |
| # date_to=None, #date_to if variant["use_dates"] else None, | |
| # ) | |
| # print(f" ➡️ Found {len(rows)} articles") | |
| # results[variant["label"]] = len(rows) | |
| # except Exception as e: | |
| # print(f"⚠️ Error in {variant['label']} fetch: {e}") | |
| # results[variant["label"]] = "error" | |
| # return results | |
| # temporary change | |
| # def fetch_articles_for_keywords( | |
| # news_topic: Dict[str, Dict[str, str]], | |
| # lookback_days: int = 3, | |
| # max_items: int = 50, | |
| # min_results: int = 5 | |
| # ) -> List[Dict[str, Any]]: | |
| # """ | |
| # Fetch articles for each keyword/lang pair using GNews. | |
| # Strategy: | |
| # 1. Try an AND query (all keywords together). | |
| # 2. If too few results, fall back to OR query. | |
| # Debug: | |
| # - Print final date_from/date_to strings. | |
| # - Print queries before calling fetch_search. | |
| # - Print full URL params if request fails. | |
| # """ | |
| # results = [] | |
| # now = datetime.utcnow().replace(microsecond=0) | |
| # date_from = (now - timedelta(days=lookback_days)).isoformat() + "Z" | |
| # date_to = now.isoformat() + "Z" | |
| # print(f"[DB7][DEBUG] date_from={date_from}, date_to={date_to}") | |
| # keywords = list(news_topic.keys()) | |
| # for lang in set(l for langs in news_topic.values() for l in langs.keys()): | |
| # terms = [news_topic[kw].get(lang, kw) for kw in keywords if news_topic[kw].get(lang)] | |
| # if not terms: | |
| # continue | |
| # # --- AND query --- | |
| # and_query = " AND ".join(terms) | |
| # print(f"[DB7][DEBUG] Trying AND query for lang={lang}: {and_query}") | |
| # try: | |
| # rows = fetch_search( | |
| # api_key=GNEWS_KEY, | |
| # bucket="db7", | |
| # q=and_query, | |
| # lang=lang, | |
| # country=None, | |
| # max_items=max_items, | |
| # pages=1, | |
| # sort_by="popularity", | |
| # date_from=date_from, | |
| # date_to=date_to, | |
| # ) | |
| # print(f" ➡️ Found {len(rows)} articles (AND)") | |
| # results.extend(rows) | |
| # time.sleep(1) | |
| # except Exception as e: | |
| # print(f"⚠️ Error in AND fetch for lang={lang}, query='{and_query}'") | |
| # print(f" date_from={date_from}, date_to={date_to}") | |
| # print(f" Exception: {e}") | |
| # rows = [] | |
| # # --- OR fallback --- | |
| # if len(rows) < min_results: | |
| # or_query = " OR ".join(terms) | |
| # print(f"[DB7][DEBUG] Trying OR query for lang={lang}: {or_query}") | |
| # try: | |
| # rows_or = fetch_search( | |
| # api_key=GNEWS_KEY, | |
| # bucket="db7", | |
| # q=or_query, | |
| # lang=lang, | |
| # country=None, | |
| # max_items=max_items, | |
| # pages=1, | |
| # sort_by="relevance", | |
| # date_from=date_from, | |
| # date_to=date_to, | |
| # ) | |
| # print(f" ➡️ Found {len(rows_or)} articles (OR fallback)") | |
| # results.extend(rows_or) | |
| # time.sleep(1) | |
| # except Exception as e: | |
| # print(f"⚠️ Error in OR fetch for lang={lang}, query='{or_query}'") | |
| # print(f" date_from={date_from}, date_to={date_to}") | |
| # print(f" Exception: {e}") | |
| # return results | |
| def fetch_articles_for_keywords( | |
| news_topic: list[str], | |
| lookback_days: int = 3, | |
| max_items: int = 50, | |
| min_results: int = 5, | |
| user_countries: list[str] | None = None, | |
| user_lang: list[str] | None = None | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Fetch articles for a list of keywords in multiple languages and countries. | |
| Steps: | |
| 1. Translate news_topic into each user_lang (if not "en"). | |
| 2. Perform fetches for each (topic_lang, country, lang). | |
| 3. Always also fetch global English (country=None, lang="en"). | |
| 4. Translate found titles+descriptions back into English. | |
| """ | |
| results: List[Dict[str, Any]] = [] | |
| now = datetime.utcnow().replace(microsecond=0) | |
| date_from = (now - timedelta(days=lookback_days)).isoformat() + "Z" | |
| date_to = now.isoformat() + "Z" | |
| print(f"[DB7][DEBUG] date_from={date_from}, date_to={date_to}") | |
| if not news_topic: | |
| print("⚠️ No keywords provided") | |
| return results | |
| # Ensure we always include English | |
| langs = set(user_lang or []) | |
| langs.add("en") | |
| # --- Loop over each user language + country --- | |
| for lang in langs: | |
| # Translate topic terms if needed | |
| if lang == "en": | |
| translated_terms = news_topic | |
| else: | |
| translated_terms = [translate_term(term, lang) for term in news_topic] | |
| # Build AND and OR queries | |
| and_query = " AND ".join(translated_terms) | |
| or_query = " OR ".join(translated_terms) | |
| countries_to_check = user_countries if user_countries else [None] | |
| for country in countries_to_check: | |
| # --- AND query first --- | |
| try: | |
| print(f"[DB7][DEBUG] Trying AND query lang={lang}, country={country}: {and_query}") | |
| rows = fetch_search( | |
| api_key=GNEWS_KEY, | |
| bucket="db7", | |
| q=and_query, | |
| lang=lang, | |
| country=country, | |
| max_items=max_items, | |
| pages=1, | |
| sort_by="relevance", | |
| date_from=date_from, | |
| date_to=date_to, | |
| ) | |
| print(f" ➡️ Found {len(rows)} articles (AND)") | |
| results.extend(rows) | |
| time.sleep(1) | |
| except Exception as e: | |
| print(f"⚠️ Error in AND fetch for lang={lang}, country={country}: {e}") | |
| rows = [] | |
| # --- OR fallback --- | |
| if len(rows) < min_results: | |
| try: | |
| print(f"[DB7][DEBUG] Trying OR query lang={lang}, country={country}: {or_query}") | |
| rows_or = fetch_search( | |
| api_key=GNEWS_KEY, | |
| bucket="db7", | |
| q=or_query, | |
| lang=lang, | |
| country=country, | |
| max_items=max_items, | |
| pages=1, | |
| sort_by="relevance", | |
| date_from=date_from, | |
| date_to=date_to, | |
| ) | |
| print(f" ➡️ Found {len(rows_or)} articles (OR fallback)") | |
| results.extend(rows_or) | |
| time.sleep(1) | |
| except Exception as e: | |
| print(f"⚠️ Error in OR fetch for lang={lang}, country={country}: {e}") | |
| # --- Translate fetched articles into English --- | |
| translated_results = [] | |
| for a in results: | |
| try: | |
| title_en = query_llm(f"Translate into English (short, precise):\n\n{a.get('title','')}") | |
| desc_en = query_llm(f"Translate into English (short, precise):\n\n{a.get('description','')}") | |
| except Exception as e: | |
| print(f"⚠️ Translation failed for {a.get('url')}: {e}") | |
| title_en, desc_en = a.get("title",""), a.get("description","") | |
| a["title_en"] = title_en | |
| a["description_en"] = desc_en | |
| translated_results.append(a) | |
| return translated_results | |
| # --- Step 4: Embed + rank --- | |
| def embed_and_rank_articles(articles: List[Dict[str, Any]], query: str): | |
| docs = [] | |
| for a in articles: | |
| content = (a.get("title") or "") + " " + (a.get("description") or "") | |
| docs.append(Document( | |
| page_content=content, | |
| metadata={ | |
| "url": a.get("url"), | |
| "title": a.get("title"), | |
| "date": a.get("published_at"), | |
| "source": (a.get("source") or {}).get("name", "") | |
| } | |
| )) | |
| vectorstore = FAISS.from_documents(docs, embeddings) | |
| docs_and_scores = vectorstore.similarity_search_with_score(query, k=len(docs)) | |
| ranked = [] | |
| for d, score in docs_and_scores: | |
| ranked.append({ | |
| "title": d.metadata.get("title", ""), | |
| "url": d.metadata.get("url", ""), | |
| "date": d.metadata.get("date", ""), | |
| "source": d.metadata.get("source", ""), | |
| "snippet": d.page_content[:300], | |
| "score": float(score), | |
| }) | |
| return vectorstore, ranked | |
| # --- Step 5: Fetch top full articles --- | |
| def fetch_top_full_articles(ranked: List[Dict[str, Any]], top_n: int = 2): | |
| selected = [] | |
| for r in ranked[:top_n]: | |
| full_text = fetch_full_article(r["url"]) | |
| if not full_text: | |
| continue | |
| selected.append({ | |
| "title": r["title"], | |
| "url": r["url"], | |
| "date": r["date"], | |
| "source": r["source"], | |
| "full_text": full_text, | |
| "score": r["score"], | |
| }) | |
| return selected | |
| #build faiss from the full text articles save it in sb7 | |
| def build_faiss_from_full_articles(full_articles: List[Dict[str, Any]]) -> FAISS: | |
| """ | |
| Build an in-memory FAISS index from full text articles (db7). | |
| Each document stores the full_text in page_content and metadata with URL etc. | |
| """ | |
| docs = [] | |
| for a in full_articles: | |
| docs.append(Document( | |
| page_content=a["full_text"], | |
| metadata={ | |
| "url": a.get("url"), | |
| "title": a.get("title"), | |
| "date": a.get("date"), | |
| "source": a.get("source"), | |
| "score": a.get("score"), | |
| } | |
| )) | |
| return FAISS.from_documents(docs, embeddings) | |
| #merge it with db6 | |
| def merge_db7_into_db6(full_articles: List[Dict[str, Any]], username: str): | |
| """ | |
| Merge FAISS index built from db7 full text articles into db6 FAISS in Supabase. | |
| """ | |
| # 1. Build FAISS from the new full articles | |
| new_db = build_faiss_from_full_articles(full_articles) | |
| # 2. Download existing db6 FAISS from Supabase | |
| try: | |
| tmp_dir = download_faiss_from_supabase("db6", username=username) | |
| existing_db = FAISS.load_local(tmp_dir, embeddings, allow_dangerous_deserialization=True) | |
| existing_db.merge_from(new_db) | |
| save_faiss_to_supabase(existing_db, db_key="db6", username=username) | |
| print(f"✅ Merged {len(full_articles)} db7 full-text articles into db6 FAISS") | |
| except FileNotFoundError: | |
| # If db6 doesn’t exist yet, just create it | |
| save_faiss_to_supabase(new_db, db_key="db6", username=username) | |
| print(f"✅ Created new db6 FAISS from {len(full_articles)} db7 full-text articles") | |
| return existing_db if 'existing' in locals() else new_db |