import os, time, requests from typing import List, Dict, Any from datetime import datetime, timedelta from openai import OpenAI from langchain_community.embeddings import HuggingFaceEmbeddings from typing import List, Dict from langchain_community.vectorstores import FAISS from langchain_core.documents import Document from dateutil import parser from supabase import create_client import time import requests from supabase_ie import upload_text, download_faiss_from_supabase, save_faiss_to_supabase from config import SUPABASE_URL, SUPABASE_SERVICE_KEY, OPENAI_CLASSIFIER_MODEL, GNEWS_KEY,HF_EMBEDDING_MODEL, MY_SUPABASE_UUID supabase = create_client(SUPABASE_URL, SUPABASE_SERVICE_KEY) SEARCH_URL = "https://gnews.io/api/v4/search" MODEL = OPENAI_CLASSIFIER_MODEL client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) gnews_key = GNEWS_KEY DEFAULT_TIMEOUT = 25 SIMILARITY_THRESHOLD = 0.6 UA = {"User-Agent": "Genesis-NewsBot/1.0 (+internal-use)"} embeddings = HuggingFaceEmbeddings( model_name= HF_EMBEDDING_MODEL, encode_kwargs={"normalize_embeddings": True}, ) COUNTRY_NAME_TO_ISO2 = { # Anglosphere & Europe (core) "australia": "au", "new zealand": "nz", "united kingdom": "gb", "england": "gb", "scotland": "gb", "wales": "gb", "northern ireland": "gb", "ireland": "ie", "france": "fr", "germany": "de", "italy": "it", "spain": "es", "portugal": "pt", "netherlands": "nl", "belgium": "be", "switzerland": "ch", "austria": "at", "denmark": "dk", "sweden": "se", "norway": "no", "finland": "fi", "iceland": "is", "poland": "pl", "czech republic": "cz", "czechia": "cz", "slovakia": "sk", "hungary": "hu", "greece": "gr", "romania": "ro", "bulgaria": "bg", "croatia": "hr", "serbia": "rs", "slovenia": "si", "albania": "al", "bosnia and herzegovina": "ba", "north macedonia": "mk", "montenegro": "me", "estonia": "ee", "latvia": "lv", "lithuania": "lt", "luxembourg": "lu", "malta": "mt", "monaco": "mc", "andorra": "ad", "san marino": "sm", "vatican": "va", # Americas "united states": "us", "usa": "us", "canada": "ca", "mexico": "mx", "argentina": "ar", "brazil": "br", "chile": "cl", "colombia": "co", "peru": "pe", "uruguay": "uy", "paraguay": "py", "bolivia": "bo", "ecuador": "ec", "venezuela": "ve", "costa rica": "cr", "panama": "pa", "guatemala": "gt", "honduras": "hn", "el salvador": "sv", "nicaragua": "ni", "dominican republic": "do", "cuba": "cu", "jamaica": "jm", "trinidad and tobago": "tt", # Middle East & North Africa "turkey": "tr", "cyprus": "cy", "israel": "il", "palestine": "ps", "lebanon": "lb", "jordan": "jo", "saudi arabia": "sa", "united arab emirates": "ae", "uae": "ae", "qatar": "qa", "kuwait": "kw", "oman": "om", "bahrain": "bh", "iran": "ir", "iraq": "iq", "egypt": "eg", "morocco": "ma", "algeria": "dz", "tunisia": "tn", # Sub-Saharan Africa "south africa": "za", "nigeria": "ng", "ghana": "gh", "kenya": "ke", "ethiopia": "et", "tanzania": "tz", "uganda": "ug", "rwanda": "rw", "senegal": "sn", "ivory coast": "ci", "cote d'ivoire": "ci", "cameroon": "cm", "angola": "ao", "zambia": "zm", "zimbabwe": "zw", "botswana": "bw", "namibia": "na", "dr congo": "cd", "democratic republic of the congo": "cd", # Asia "china": "cn", "hong kong": "hk", "taiwan": "tw", "japan": "jp", "south korea": "kr", "korea, south": "kr", "north korea": "kp", "india": "in", "pakistan": "pk", "bangladesh": "bd", "sri lanka": "lk", "nepal": "np", "bhutan": "bt", "maldives": "mv", "indonesia": "id", "malaysia": "my", "singapore": "sg", "thailand": "th", "vietnam": "vn", "cambodia": "kh", "laos": "la", "myanmar": "mm", "philippines": "ph", "mongolia": "mn", "kazakhstan": "kz", "uzbekistan": "uz", # Oceania & Pacific "papua new guinea": "pg", "fiji": "fj", "samoa": "ws", "tonga": "to", "vanuatu": "vu", "solomon islands": "sb", # Already present (kept for completeness) "russia": "ru", "ukraine": "ua", "portugal": "pt", "greece": "gr", "netherlands": "nl", "norway": "no", "romania": "ro", "sweden": "se", "switzerland": "ch", "poland": "pl", "peru": "pe", "pakistan": "pk", "philippines": "ph", "japan": "jp", "ireland": "ie", "italy": "it", "germany": "de", "france": "fr", "spain": "es", "canada": "ca", "brazil": "br", "china": "cn", "australia": "au", "egypt": "eg", "hong kong": "hk", } KEYWORDS = ["Culture", "Sport", "Soccer", "Football", "Science", "Politics", "News", "War"] COUNTRY_LANG_MAP = { # EUROPE "at": ["de"], # Austria "be": ["fr", "nl", "de"], # Belgium "bg": ["bg"], # Bulgaria "ch": ["de", "fr", "it"], # Switzerland "cz": ["cs"], # Czech Republic "de": ["de"], # Germany "dk": ["da"], # Denmark "ee": ["et"], # Estonia "es": ["es"], # Spain "fi": ["fi", "sv"], # Finland "fr": ["fr"], # France "gr": ["el"], # Greece "hu": ["hu"], # Hungary "it": ["it"], # Italy "lt": ["lt"], # Lithuania "lv": ["lv"], # Latvia "nl": ["nl"], # Netherlands "no": ["no"], # Norway "pl": ["pl"], # Poland "pt": ["pt"], # Portugal "ro": ["ro"], # Romania "ru": ["ru"], # Russia "se": ["sv"], # Sweden "sk": ["sk"], # Slovakia "sl": ["sl"], # Slovenia "tr": ["tr"], # Turkey "ua": ["uk"], # Ukraine # AMERICAS (Spanish/Portuguese) "ar": ["es"], # Argentina "bo": ["es"], # Bolivia "br": ["pt"], # Brazil "ca": ["fr"], # Canada (keep FR only; EN is implicit) "cl": ["es"], # Chile "co": ["es"], # Colombia "cr": ["es"], # Costa Rica "cu": ["es"], # Cuba "do": ["es"], # Dominican Republic "ec": ["es"], # Ecuador "gt": ["es"], # Guatemala "hn": ["es"], # Honduras "mx": ["es"], # Mexico "ni": ["es"], # Nicaragua "pa": ["es"], # Panama "pe": ["es"], # Peru "py": ["es"], # Paraguay "sv": ["es"], # El Salvador "uy": ["es"], # Uruguay "ve": ["es"], # Venezuela # MIDDLE EAST & NORTH AFRICA "ae": ["ar"], # UAE "dz": ["ar", "fr"], # Algeria "eg": ["ar"], # Egypt "il": ["he", "ar"], # Israel "iq": ["ar"], # Iraq "jo": ["ar"], # Jordan "kw": ["ar"], # Kuwait "lb": ["ar"], # Lebanon "ma": ["ar", "fr"], # Morocco "om": ["ar"], # Oman "ps": ["ar"], # Palestine "qa": ["ar"], # Qatar "sa": ["ar"], # Saudi Arabia "tn": ["ar", "fr"], # Tunisia # SUB-SAHARAN AFRICA (only supported non-EN) "ao": ["pt"], # Angola "cd": ["fr"], # DR Congo "cm": ["fr"], # Cameroon "ci": ["fr"], # Côte d’Ivoire "mz": ["pt"], # Mozambique "sn": ["fr"], # Senegal # ASIA "cn": ["zh"], # China "hk": ["zh"], # Hong Kong "id": ["id"], # Indonesia "in": ["hi"], # India "jp": ["ja"], # Japan "kr": ["ko"], # South Korea "sg": ["zh"], # Singapore (non-EN: Chinese) "th": ["th"], # Thailand "tw": ["zh"], # Taiwan "vn": ["vi"], # Vietnam } def query_llm(prompt: str, model: str = MODEL) -> str: response = client.chat.completions.create( model=model, messages=[{"role": "user", "content": prompt}], temperature=0 ) return response.choices[0].message.content.strip() def fetch_gnews_for_country(country: str, lookback_days: int = 3, max_items: int = 5) -> List[Dict[str, Any]]: if not gnews_key: print("⚠️ GNEWS_KEY not set, skipping fetch") return [] # Language resolution native_langs = COUNTRY_LANG_MAP.get(country, ["en"]) main_lang = native_langs[0] if isinstance(native_langs, list) else native_langs # Allow only GNews-supported languages; include 'el' for Greece supported_langs = ["en","it","fr","de","es","pt","ru","zh","ja","nl","no","sv","ro","uk","ar","hi","el"] if main_lang not in supported_langs: print(f"⚠️ Language '{main_lang}' not supported by GNews, falling back to English") main_lang = "en" date_to = datetime.utcnow() date_from = date_to - timedelta(days=lookback_days) date_from = date_from.isoformat() + "Z" date_to = date_to.isoformat() + "Z" results: List[Dict[str, Any]] = [] for kw in KEYWORDS: try: if main_lang == "en": # 1) English with country rows = fetch_search( api_key=gnews_key, bucket=kw, q=kw, lang="en", country=country, max_items=max_items, pages=1, sort_by="relevance", date_from=date_from, date_to=date_to ) results.extend(rows) time.sleep(1) # 2) Global English (no country) rows = fetch_search( api_key=gnews_key, bucket=kw, q=kw, lang="en", country=None, max_items=max_items, pages=1, sort_by="relevance", date_from=date_from, date_to=date_to ) results.extend(rows) time.sleep(1) else: # 1) Native with country rows = fetch_search( api_key=gnews_key, bucket=kw, q=translate_term(kw, main_lang), lang=main_lang, country=country, max_items=max_items, pages=1, sort_by="relevance", date_from=date_from, date_to=date_to ) results.extend(rows) time.sleep(1) # 2) Global English rows = fetch_search( api_key=gnews_key, bucket=kw, q=kw, lang="en", country=None, max_items=max_items, pages=1, sort_by="relevance", date_from=date_from, date_to=date_to ) results.extend(rows) time.sleep(1) except Exception as e: # Never crash on per-keyword errors print(f"⚠️ Fetch error for {country}/{kw}: {e}") continue # Dedupe by 'id' (the URL) seen = set() deduped = [] for r in results: rid = r.get("id") if rid and rid not in seen: seen.add(rid) deduped.append(r) return deduped def translate_term(term: str, target_lang: str) -> str: if target_lang.lower() in ["en", ""]: return term prompt = f"Translate the following word into {target_lang}. Only give the translation:\n\n{term}" return query_llm(prompt, model=MODEL) def translate_articles_to_english(articles: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Translate article titles and descriptions into English. Keeps both native and English fields in each article dict. """ out = [] for a in articles: # Translate title if a.get("title"): a["title_en"] = query_llm( f"Translate this into English (short and precise, keep meaning):\n\n{a['title']}" ) else: a["title_en"] = "" # Translate description (summary) if a.get("description"): a["description_en"] = query_llm( f"Translate this into English (short and precise, keep meaning):\n\n{a['description']}" ) else: a["description_en"] = "" out.append(a) return out def save_articles_to_txt(articles: List[Dict[str, Any]]): """ Save all fetched articles to a text file in Supabase. """ if not articles: print("⚠️ No articles to upload to Supabase (db3 summary)") return lines = [] for a in articles: lines.append(f"{a.get('title_en') or a.get('title')}") lines.append(f"{a.get('description_en') or a.get('description')}") lines.append(f"URL: {a.get('url')}") lines.append(f"Date: {a.get('published_at','')}") lines.append(f"Source: {(a.get('source') or {}).get('name','')}") lines.append("-" * 80) content = "\n".join(lines) # Example bucket structure: shared/db3 bucket = "Databases" path = f"shared/db3/news_summary.txt" upload_text(bucket=bucket, path=path, text=content) print(f"✅ Saved {len(articles)} articles to Supabase bucket {bucket}/news_summary.txt") def fetch_search( api_key: str, *, bucket: str, q: str, lang: str, country: str | None, max_items: int, pages: int, sort_by: str, date_from: str, date_to: str, ): params = { "q": q, "max": max_items, "page": pages, "sortby": sort_by, "lang": lang, "from": date_from, "to": date_to, "country": country, "apikey": api_key, } if country: params["country"] = country data = gnews_get("https://gnews.io/api/v4/search", params) if not data or "articles" not in data: return [] # graceful failure rows = [] for a in data.get("articles", []): rows.append({ "id": a.get("url"), # use url as stable id for dedupe "title": a.get("title"), "description": a.get("description"), "url": a.get("url"), "published_at": a.get("publishedAt"), "source": a.get("source", {}), "lang": lang, "bucket": bucket, "country": country or "", }) return rows def dedupe_by_id(items: List[Dict[str, Any]]) -> List[Dict[str, Any]]: seen = set(); out = [] for it in items: if it["id"] in seen: continue seen.add(it["id"]) out.append(it) return out def gnews_get(url: str, params: dict, max_retries: int = 3, timeout: int = 15): """ Safe GET with exponential backoff: - retries 429/5xx with 1s, 2s, 4s sleeps - returns parsed JSON or None on final failure """ for attempt in range(max_retries): try: r = requests.get(url, params=params, timeout=timeout) if r.status_code in (429, 500, 502, 503, 504): # transient: backoff then retry time.sleep(2 ** attempt) continue r.raise_for_status() return r.json() except requests.RequestException as e: if attempt < max_retries - 1: time.sleep(2 ** attempt) continue print(f"⚠️ GNews request failed: {e}") return None def insert_new_articles_to_faiss(articles: List[Dict[str, Any]]): """ Insert new (existing=False) articles from unmatched_articles into FAISS (db3) in Supabase. """ docs = [] for a in articles: page_content = (a.get("title_en") or "") + "\n" + (a.get("description_en") or "") date_str = a.get("published_at") try: date_val = parser.parse(date_str).astimezone().isoformat() if date_str else None except Exception: date_val = None docs.append(Document( page_content=page_content, metadata={ "title_native": a.get("title_native"), "title_en": a.get("title_en"), "summary_native": a.get("description_native"), "summary_en": a.get("description_en"), "url": a.get("url"), "date": date_val, "source": a.get("source", ""), "lang": a.get("lang", "") } )) if not docs: print("⚠️ No new documents to insert into FAISS.") return new_db = FAISS.from_documents(docs, embeddings) try: # Download existing FAISS from Supabase (if present) tmp_dir = download_faiss_from_supabase("db3", username="None") existing = FAISS.load_local(tmp_dir, embeddings, allow_dangerous_deserialization=True) existing.merge_from(new_db) save_faiss_to_supabase(existing, db_key="db3", username="None") print(f"✅ Merged {len(docs)} new articles into FAISS (db3)") except FileNotFoundError: # If no FAISS exists, just upload the new one save_faiss_to_supabase(new_db, db_key="db3", username="None") print(f"✅ Created new FAISS (db3) with {len(docs)} articles") def mark_all_existing(): """Mark all rows as existing before inserting new ones.""" supabase.table("unmatched_articles").update({"existing": True}).not_.is_("url", None).execute() CHUNK = 500 # adjust if you batch very large lists def upsert_unmatched_articles(articles: list[dict]) -> int: """ Insert/update articles into public.unmatched_articles keyed by URL. - New URL -> inserted. - Existing URL -> updated with the new fields. After upsert, all batch URLs are set to existing=False (deterministic). REQUIREMENT (run once in DB): CREATE UNIQUE INDEX IF NOT EXISTS unmatched_articles_url_uidx ON public.unmatched_articles (url); """ # Build rows to write (ONLY include fields you want to overwrite) rows = [] for a in articles: url = a.get("url") if not url: continue rows.append({ "user_id": MY_SUPABASE_UUID, "url": url, "title_native": a.get("title"), "title_en": a.get("title_en"), "description_native": a.get("description"), "description_en": a.get("description_en"), "published_at": a.get("published_at"), # ensure ISO string compatible with your column type "source": (a.get("source") or {}).get("name", ""), "lang": a.get("lang"), "existing": False, # current batch should be marked as "new" "updated_at": datetime.utcnow().isoformat() + "Z", # if column is timestamptz }) if not rows: return 0 deduped = {} for r in rows: u = r["url"].strip() if u: r["url"] = u deduped[u] = r rows = list(deduped.values()) # 1) True UPSERT by URL (insert new, update existing) # on_conflict="url" requires a UNIQUE index or PK on url resp = supabase.table("unmatched_articles").upsert( rows, on_conflict="url" # <- key difference from naive insert+catch # , returning="minimal" # uncomment to reduce payload if supported ).execute() if getattr(resp, "error", None): raise RuntimeError(f"Upsert failed: {resp.error}") # 2) Deterministically set the whole batch to existing=False (covers the UPDATE-on-conflict path) urls = [r["url"] for r in rows] for i in range(0, len(urls), CHUNK): r = supabase.table("unmatched_articles") \ .update({"existing": False}) \ .in_("url", urls[i:i+CHUNK]) \ .execute() if getattr(r, "error", None): raise RuntimeError(f"Force existing=False failed: {r.error}") return len(rows) # def upsert_unmatched_articles(articles: list[dict]): # """Insert or update articles in unmatched_articles table.""" # for a in articles: # url = a.get("url") # if not url: # continue # record = { # "user_id": "all", # "url": url, # "title_native": a.get("title"), # "title_en": a.get("title_en"), # "description_native": a.get("description"), # "description_en": a.get("description_en"), # "published_at": a.get("published_at"), # "source": (a.get("source") or {}).get("name", ""), # "lang": a.get("lang"), # "existing": False, # "updated_at": datetime.utcnow().isoformat() # } # try: # supabase.table("unmatched_articles").insert(record).execute() # except Exception as e: # if "duplicate key value" in str(e): # # Update instead of skip # supabase.table("unmatched_articles").update(record).eq("url", url).execute() def get_new_articles(): """Get new articles (existing=False).""" res = supabase.table("unmatched_articles").select("*").eq("existing", False).execute() return res.data if hasattr(res, "data") and res.data else [] def _read_countries() -> list[str]: """ Read countries across ALL users from user_profiles table. Return a unique list of ISO2 country codes. """ countries = set() try: res = supabase.table("user_profiles").select("living_country, origin_country").execute() if not res.data: return [] for row in res.data: for field in ["living_country", "origin_country"]: c = row.get(field) if not c: continue iso2 = COUNTRY_NAME_TO_ISO2.get(c.strip().lower()) if iso2: countries.add(iso2) else: print(f"⚠️ No ISO2 mapping for '{c}'") except Exception as e: print(f"⚠️ Failed to fetch user_profiles: {e}") return list(countries) # This function provides countries for a single user, used in db6 and db7 pipelines (it is in db6_utils.py because of the input being here) def get_user_countries_and_languages() -> tuple[list[str], list[str]]: """ For each call: - Fetch living_country and origin_country from all users. - Convert them into ISO2 codes (using COUNTRY_NAME_TO_ISO2). - Expand into languages (using COUNTRY_LANG_MAP). Always include English ("en"). Returns: user_countries: list of ISO2 codes (e.g., ["au","it"]) user_languages: list of language codes (e.g., ["en","it"]) """ user_countries: list[str] = [] user_languages: list[str] = [] # English always included try: res = supabase.table("user_profiles").select("living_country, origin_country").execute() if not res.data: return [], ["en"] row = res.data[0] for field in ["living_country", "origin_country"]: c = row.get(field) if not c: continue iso2 = COUNTRY_NAME_TO_ISO2.get(c.strip().lower()) if iso2 and iso2 not in user_countries: user_countries.append(iso2) # add languages for this country if available if iso2 in COUNTRY_LANG_MAP: for lang in COUNTRY_LANG_MAP[iso2]: if lang not in user_languages: user_languages.append(lang) else: print(f"⚠️ No ISO2 mapping for '{c}')") except Exception as e: print(f"⚠️ Failed to fetch countries/langs for user: {e}") return user_countries, user_languages