Socrates_docker / db3_utils.py
alesamodio's picture
remove fet_user_id and get_username
0b170f9
import os, time, requests
from typing import List, Dict, Any
from datetime import datetime, timedelta
from openai import OpenAI
from langchain_community.embeddings import HuggingFaceEmbeddings
from typing import List, Dict
from langchain_community.vectorstores import FAISS
from langchain_core.documents import Document
from dateutil import parser
from supabase import create_client
import time
import requests
from supabase_ie import upload_text, download_faiss_from_supabase, save_faiss_to_supabase
from config import SUPABASE_URL, SUPABASE_SERVICE_KEY, OPENAI_CLASSIFIER_MODEL, GNEWS_KEY,HF_EMBEDDING_MODEL, MY_SUPABASE_UUID
supabase = create_client(SUPABASE_URL, SUPABASE_SERVICE_KEY)
SEARCH_URL = "https://gnews.io/api/v4/search"
MODEL = OPENAI_CLASSIFIER_MODEL
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
gnews_key = GNEWS_KEY
DEFAULT_TIMEOUT = 25
SIMILARITY_THRESHOLD = 0.6
UA = {"User-Agent": "Genesis-NewsBot/1.0 (+internal-use)"}
embeddings = HuggingFaceEmbeddings(
model_name= HF_EMBEDDING_MODEL,
encode_kwargs={"normalize_embeddings": True},
)
COUNTRY_NAME_TO_ISO2 = {
# Anglosphere & Europe (core)
"australia": "au",
"new zealand": "nz",
"united kingdom": "gb",
"england": "gb",
"scotland": "gb",
"wales": "gb",
"northern ireland": "gb",
"ireland": "ie",
"france": "fr",
"germany": "de",
"italy": "it",
"spain": "es",
"portugal": "pt",
"netherlands": "nl",
"belgium": "be",
"switzerland": "ch",
"austria": "at",
"denmark": "dk",
"sweden": "se",
"norway": "no",
"finland": "fi",
"iceland": "is",
"poland": "pl",
"czech republic": "cz",
"czechia": "cz",
"slovakia": "sk",
"hungary": "hu",
"greece": "gr",
"romania": "ro",
"bulgaria": "bg",
"croatia": "hr",
"serbia": "rs",
"slovenia": "si",
"albania": "al",
"bosnia and herzegovina": "ba",
"north macedonia": "mk",
"montenegro": "me",
"estonia": "ee",
"latvia": "lv",
"lithuania": "lt",
"luxembourg": "lu",
"malta": "mt",
"monaco": "mc",
"andorra": "ad",
"san marino": "sm",
"vatican": "va",
# Americas
"united states": "us",
"usa": "us",
"canada": "ca",
"mexico": "mx",
"argentina": "ar",
"brazil": "br",
"chile": "cl",
"colombia": "co",
"peru": "pe",
"uruguay": "uy",
"paraguay": "py",
"bolivia": "bo",
"ecuador": "ec",
"venezuela": "ve",
"costa rica": "cr",
"panama": "pa",
"guatemala": "gt",
"honduras": "hn",
"el salvador": "sv",
"nicaragua": "ni",
"dominican republic": "do",
"cuba": "cu",
"jamaica": "jm",
"trinidad and tobago": "tt",
# Middle East & North Africa
"turkey": "tr",
"cyprus": "cy",
"israel": "il",
"palestine": "ps",
"lebanon": "lb",
"jordan": "jo",
"saudi arabia": "sa",
"united arab emirates": "ae",
"uae": "ae",
"qatar": "qa",
"kuwait": "kw",
"oman": "om",
"bahrain": "bh",
"iran": "ir",
"iraq": "iq",
"egypt": "eg",
"morocco": "ma",
"algeria": "dz",
"tunisia": "tn",
# Sub-Saharan Africa
"south africa": "za",
"nigeria": "ng",
"ghana": "gh",
"kenya": "ke",
"ethiopia": "et",
"tanzania": "tz",
"uganda": "ug",
"rwanda": "rw",
"senegal": "sn",
"ivory coast": "ci",
"cote d'ivoire": "ci",
"cameroon": "cm",
"angola": "ao",
"zambia": "zm",
"zimbabwe": "zw",
"botswana": "bw",
"namibia": "na",
"dr congo": "cd",
"democratic republic of the congo": "cd",
# Asia
"china": "cn",
"hong kong": "hk",
"taiwan": "tw",
"japan": "jp",
"south korea": "kr",
"korea, south": "kr",
"north korea": "kp",
"india": "in",
"pakistan": "pk",
"bangladesh": "bd",
"sri lanka": "lk",
"nepal": "np",
"bhutan": "bt",
"maldives": "mv",
"indonesia": "id",
"malaysia": "my",
"singapore": "sg",
"thailand": "th",
"vietnam": "vn",
"cambodia": "kh",
"laos": "la",
"myanmar": "mm",
"philippines": "ph",
"mongolia": "mn",
"kazakhstan": "kz",
"uzbekistan": "uz",
# Oceania & Pacific
"papua new guinea": "pg",
"fiji": "fj",
"samoa": "ws",
"tonga": "to",
"vanuatu": "vu",
"solomon islands": "sb",
# Already present (kept for completeness)
"russia": "ru",
"ukraine": "ua",
"portugal": "pt",
"greece": "gr",
"netherlands": "nl",
"norway": "no",
"romania": "ro",
"sweden": "se",
"switzerland": "ch",
"poland": "pl",
"peru": "pe",
"pakistan": "pk",
"philippines": "ph",
"japan": "jp",
"ireland": "ie",
"italy": "it",
"germany": "de",
"france": "fr",
"spain": "es",
"canada": "ca",
"brazil": "br",
"china": "cn",
"australia": "au",
"egypt": "eg",
"hong kong": "hk",
}
KEYWORDS = ["Culture", "Sport", "Soccer", "Football", "Science", "Politics", "News", "War"]
COUNTRY_LANG_MAP = {
# EUROPE
"at": ["de"], # Austria
"be": ["fr", "nl", "de"], # Belgium
"bg": ["bg"], # Bulgaria
"ch": ["de", "fr", "it"], # Switzerland
"cz": ["cs"], # Czech Republic
"de": ["de"], # Germany
"dk": ["da"], # Denmark
"ee": ["et"], # Estonia
"es": ["es"], # Spain
"fi": ["fi", "sv"], # Finland
"fr": ["fr"], # France
"gr": ["el"], # Greece
"hu": ["hu"], # Hungary
"it": ["it"], # Italy
"lt": ["lt"], # Lithuania
"lv": ["lv"], # Latvia
"nl": ["nl"], # Netherlands
"no": ["no"], # Norway
"pl": ["pl"], # Poland
"pt": ["pt"], # Portugal
"ro": ["ro"], # Romania
"ru": ["ru"], # Russia
"se": ["sv"], # Sweden
"sk": ["sk"], # Slovakia
"sl": ["sl"], # Slovenia
"tr": ["tr"], # Turkey
"ua": ["uk"], # Ukraine
# AMERICAS (Spanish/Portuguese)
"ar": ["es"], # Argentina
"bo": ["es"], # Bolivia
"br": ["pt"], # Brazil
"ca": ["fr"], # Canada (keep FR only; EN is implicit)
"cl": ["es"], # Chile
"co": ["es"], # Colombia
"cr": ["es"], # Costa Rica
"cu": ["es"], # Cuba
"do": ["es"], # Dominican Republic
"ec": ["es"], # Ecuador
"gt": ["es"], # Guatemala
"hn": ["es"], # Honduras
"mx": ["es"], # Mexico
"ni": ["es"], # Nicaragua
"pa": ["es"], # Panama
"pe": ["es"], # Peru
"py": ["es"], # Paraguay
"sv": ["es"], # El Salvador
"uy": ["es"], # Uruguay
"ve": ["es"], # Venezuela
# MIDDLE EAST & NORTH AFRICA
"ae": ["ar"], # UAE
"dz": ["ar", "fr"], # Algeria
"eg": ["ar"], # Egypt
"il": ["he", "ar"], # Israel
"iq": ["ar"], # Iraq
"jo": ["ar"], # Jordan
"kw": ["ar"], # Kuwait
"lb": ["ar"], # Lebanon
"ma": ["ar", "fr"], # Morocco
"om": ["ar"], # Oman
"ps": ["ar"], # Palestine
"qa": ["ar"], # Qatar
"sa": ["ar"], # Saudi Arabia
"tn": ["ar", "fr"], # Tunisia
# SUB-SAHARAN AFRICA (only supported non-EN)
"ao": ["pt"], # Angola
"cd": ["fr"], # DR Congo
"cm": ["fr"], # Cameroon
"ci": ["fr"], # Côte d’Ivoire
"mz": ["pt"], # Mozambique
"sn": ["fr"], # Senegal
# ASIA
"cn": ["zh"], # China
"hk": ["zh"], # Hong Kong
"id": ["id"], # Indonesia
"in": ["hi"], # India
"jp": ["ja"], # Japan
"kr": ["ko"], # South Korea
"sg": ["zh"], # Singapore (non-EN: Chinese)
"th": ["th"], # Thailand
"tw": ["zh"], # Taiwan
"vn": ["vi"], # Vietnam
}
def query_llm(prompt: str, model: str = MODEL) -> str:
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
temperature=0
)
return response.choices[0].message.content.strip()
def fetch_gnews_for_country(country: str, lookback_days: int = 3, max_items: int = 5) -> List[Dict[str, Any]]:
if not gnews_key:
print("⚠️ GNEWS_KEY not set, skipping fetch")
return []
# Language resolution
native_langs = COUNTRY_LANG_MAP.get(country, ["en"])
main_lang = native_langs[0] if isinstance(native_langs, list) else native_langs
# Allow only GNews-supported languages; include 'el' for Greece
supported_langs = ["en","it","fr","de","es","pt","ru","zh","ja","nl","no","sv","ro","uk","ar","hi","el"]
if main_lang not in supported_langs:
print(f"⚠️ Language '{main_lang}' not supported by GNews, falling back to English")
main_lang = "en"
date_to = datetime.utcnow()
date_from = date_to - timedelta(days=lookback_days)
date_from = date_from.isoformat() + "Z"
date_to = date_to.isoformat() + "Z"
results: List[Dict[str, Any]] = []
for kw in KEYWORDS:
try:
if main_lang == "en":
# 1) English with country
rows = fetch_search(
api_key=gnews_key, bucket=kw, q=kw, lang="en", country=country,
max_items=max_items, pages=1, sort_by="relevance",
date_from=date_from, date_to=date_to
)
results.extend(rows)
time.sleep(1)
# 2) Global English (no country)
rows = fetch_search(
api_key=gnews_key, bucket=kw, q=kw, lang="en", country=None,
max_items=max_items, pages=1, sort_by="relevance",
date_from=date_from, date_to=date_to
)
results.extend(rows)
time.sleep(1)
else:
# 1) Native with country
rows = fetch_search(
api_key=gnews_key, bucket=kw, q=translate_term(kw, main_lang), lang=main_lang, country=country,
max_items=max_items, pages=1, sort_by="relevance",
date_from=date_from, date_to=date_to
)
results.extend(rows)
time.sleep(1)
# 2) Global English
rows = fetch_search(
api_key=gnews_key, bucket=kw, q=kw, lang="en", country=None,
max_items=max_items, pages=1, sort_by="relevance",
date_from=date_from, date_to=date_to
)
results.extend(rows)
time.sleep(1)
except Exception as e:
# Never crash on per-keyword errors
print(f"⚠️ Fetch error for {country}/{kw}: {e}")
continue
# Dedupe by 'id' (the URL)
seen = set()
deduped = []
for r in results:
rid = r.get("id")
if rid and rid not in seen:
seen.add(rid)
deduped.append(r)
return deduped
def translate_term(term: str, target_lang: str) -> str:
if target_lang.lower() in ["en", ""]:
return term
prompt = f"Translate the following word into {target_lang}. Only give the translation:\n\n{term}"
return query_llm(prompt, model=MODEL)
def translate_articles_to_english(articles: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Translate article titles and descriptions into English.
Keeps both native and English fields in each article dict.
"""
out = []
for a in articles:
# Translate title
if a.get("title"):
a["title_en"] = query_llm(
f"Translate this into English (short and precise, keep meaning):\n\n{a['title']}"
)
else:
a["title_en"] = ""
# Translate description (summary)
if a.get("description"):
a["description_en"] = query_llm(
f"Translate this into English (short and precise, keep meaning):\n\n{a['description']}"
)
else:
a["description_en"] = ""
out.append(a)
return out
def save_articles_to_txt(articles: List[Dict[str, Any]]):
"""
Save all fetched articles to a text file in Supabase.
"""
if not articles:
print("⚠️ No articles to upload to Supabase (db3 summary)")
return
lines = []
for a in articles:
lines.append(f"{a.get('title_en') or a.get('title')}")
lines.append(f"{a.get('description_en') or a.get('description')}")
lines.append(f"URL: {a.get('url')}")
lines.append(f"Date: {a.get('published_at','')}")
lines.append(f"Source: {(a.get('source') or {}).get('name','')}")
lines.append("-" * 80)
content = "\n".join(lines)
# Example bucket structure: shared/db3
bucket = "Databases"
path = f"shared/db3/news_summary.txt"
upload_text(bucket=bucket, path=path, text=content)
print(f"✅ Saved {len(articles)} articles to Supabase bucket {bucket}/news_summary.txt")
def fetch_search(
api_key: str,
*,
bucket: str,
q: str,
lang: str,
country: str | None,
max_items: int,
pages: int,
sort_by: str,
date_from: str,
date_to: str,
):
params = {
"q": q,
"max": max_items,
"page": pages,
"sortby": sort_by,
"lang": lang,
"from": date_from,
"to": date_to,
"country": country,
"apikey": api_key,
}
if country:
params["country"] = country
data = gnews_get("https://gnews.io/api/v4/search", params)
if not data or "articles" not in data:
return [] # graceful failure
rows = []
for a in data.get("articles", []):
rows.append({
"id": a.get("url"), # use url as stable id for dedupe
"title": a.get("title"),
"description": a.get("description"),
"url": a.get("url"),
"published_at": a.get("publishedAt"),
"source": a.get("source", {}),
"lang": lang,
"bucket": bucket,
"country": country or "",
})
return rows
def dedupe_by_id(items: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
seen = set(); out = []
for it in items:
if it["id"] in seen:
continue
seen.add(it["id"])
out.append(it)
return out
def gnews_get(url: str, params: dict, max_retries: int = 3, timeout: int = 15):
"""
Safe GET with exponential backoff:
- retries 429/5xx with 1s, 2s, 4s sleeps
- returns parsed JSON or None on final failure
"""
for attempt in range(max_retries):
try:
r = requests.get(url, params=params, timeout=timeout)
if r.status_code in (429, 500, 502, 503, 504):
# transient: backoff then retry
time.sleep(2 ** attempt)
continue
r.raise_for_status()
return r.json()
except requests.RequestException as e:
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
continue
print(f"⚠️ GNews request failed: {e}")
return None
def insert_new_articles_to_faiss(articles: List[Dict[str, Any]]):
"""
Insert new (existing=False) articles from unmatched_articles into FAISS (db3) in Supabase.
"""
docs = []
for a in articles:
page_content = (a.get("title_en") or "") + "\n" + (a.get("description_en") or "")
date_str = a.get("published_at")
try:
date_val = parser.parse(date_str).astimezone().isoformat() if date_str else None
except Exception:
date_val = None
docs.append(Document(
page_content=page_content,
metadata={
"title_native": a.get("title_native"),
"title_en": a.get("title_en"),
"summary_native": a.get("description_native"),
"summary_en": a.get("description_en"),
"url": a.get("url"),
"date": date_val,
"source": a.get("source", ""),
"lang": a.get("lang", "")
}
))
if not docs:
print("⚠️ No new documents to insert into FAISS.")
return
new_db = FAISS.from_documents(docs, embeddings)
try:
# Download existing FAISS from Supabase (if present)
tmp_dir = download_faiss_from_supabase("db3", username="None")
existing = FAISS.load_local(tmp_dir, embeddings, allow_dangerous_deserialization=True)
existing.merge_from(new_db)
save_faiss_to_supabase(existing, db_key="db3", username="None")
print(f"✅ Merged {len(docs)} new articles into FAISS (db3)")
except FileNotFoundError:
# If no FAISS exists, just upload the new one
save_faiss_to_supabase(new_db, db_key="db3", username="None")
print(f"✅ Created new FAISS (db3) with {len(docs)} articles")
def mark_all_existing():
"""Mark all rows as existing before inserting new ones."""
supabase.table("unmatched_articles").update({"existing": True}).not_.is_("url", None).execute()
CHUNK = 500 # adjust if you batch very large lists
def upsert_unmatched_articles(articles: list[dict]) -> int:
"""
Insert/update articles into public.unmatched_articles keyed by URL.
- New URL -> inserted.
- Existing URL -> updated with the new fields.
After upsert, all batch URLs are set to existing=False (deterministic).
REQUIREMENT (run once in DB):
CREATE UNIQUE INDEX IF NOT EXISTS unmatched_articles_url_uidx
ON public.unmatched_articles (url);
"""
# Build rows to write (ONLY include fields you want to overwrite)
rows = []
for a in articles:
url = a.get("url")
if not url:
continue
rows.append({
"user_id": MY_SUPABASE_UUID,
"url": url,
"title_native": a.get("title"),
"title_en": a.get("title_en"),
"description_native": a.get("description"),
"description_en": a.get("description_en"),
"published_at": a.get("published_at"), # ensure ISO string compatible with your column type
"source": (a.get("source") or {}).get("name", ""),
"lang": a.get("lang"),
"existing": False, # current batch should be marked as "new"
"updated_at": datetime.utcnow().isoformat() + "Z", # if column is timestamptz
})
if not rows:
return 0
deduped = {}
for r in rows:
u = r["url"].strip()
if u:
r["url"] = u
deduped[u] = r
rows = list(deduped.values())
# 1) True UPSERT by URL (insert new, update existing)
# on_conflict="url" requires a UNIQUE index or PK on url
resp = supabase.table("unmatched_articles").upsert(
rows,
on_conflict="url" # <- key difference from naive insert+catch
# , returning="minimal" # uncomment to reduce payload if supported
).execute()
if getattr(resp, "error", None):
raise RuntimeError(f"Upsert failed: {resp.error}")
# 2) Deterministically set the whole batch to existing=False (covers the UPDATE-on-conflict path)
urls = [r["url"] for r in rows]
for i in range(0, len(urls), CHUNK):
r = supabase.table("unmatched_articles") \
.update({"existing": False}) \
.in_("url", urls[i:i+CHUNK]) \
.execute()
if getattr(r, "error", None):
raise RuntimeError(f"Force existing=False failed: {r.error}")
return len(rows)
# def upsert_unmatched_articles(articles: list[dict]):
# """Insert or update articles in unmatched_articles table."""
# for a in articles:
# url = a.get("url")
# if not url:
# continue
# record = {
# "user_id": "all",
# "url": url,
# "title_native": a.get("title"),
# "title_en": a.get("title_en"),
# "description_native": a.get("description"),
# "description_en": a.get("description_en"),
# "published_at": a.get("published_at"),
# "source": (a.get("source") or {}).get("name", ""),
# "lang": a.get("lang"),
# "existing": False,
# "updated_at": datetime.utcnow().isoformat()
# }
# try:
# supabase.table("unmatched_articles").insert(record).execute()
# except Exception as e:
# if "duplicate key value" in str(e):
# # Update instead of skip
# supabase.table("unmatched_articles").update(record).eq("url", url).execute()
def get_new_articles():
"""Get new articles (existing=False)."""
res = supabase.table("unmatched_articles").select("*").eq("existing", False).execute()
return res.data if hasattr(res, "data") and res.data else []
def _read_countries() -> list[str]:
"""
Read countries across ALL users from user_profiles table.
Return a unique list of ISO2 country codes.
"""
countries = set()
try:
res = supabase.table("user_profiles").select("living_country, origin_country").execute()
if not res.data:
return []
for row in res.data:
for field in ["living_country", "origin_country"]:
c = row.get(field)
if not c:
continue
iso2 = COUNTRY_NAME_TO_ISO2.get(c.strip().lower())
if iso2:
countries.add(iso2)
else:
print(f"⚠️ No ISO2 mapping for '{c}'")
except Exception as e:
print(f"⚠️ Failed to fetch user_profiles: {e}")
return list(countries)
# This function provides countries for a single user, used in db6 and db7 pipelines (it is in db6_utils.py because of the input being here)
def get_user_countries_and_languages() -> tuple[list[str], list[str]]:
"""
For each call:
- Fetch living_country and origin_country from all users.
- Convert them into ISO2 codes (using COUNTRY_NAME_TO_ISO2).
- Expand into languages (using COUNTRY_LANG_MAP).
Always include English ("en").
Returns:
user_countries: list of ISO2 codes (e.g., ["au","it"])
user_languages: list of language codes (e.g., ["en","it"])
"""
user_countries: list[str] = []
user_languages: list[str] = [] # English always included
try:
res = supabase.table("user_profiles").select("living_country, origin_country").execute()
if not res.data:
return [], ["en"]
row = res.data[0]
for field in ["living_country", "origin_country"]:
c = row.get(field)
if not c:
continue
iso2 = COUNTRY_NAME_TO_ISO2.get(c.strip().lower())
if iso2 and iso2 not in user_countries:
user_countries.append(iso2)
# add languages for this country if available
if iso2 in COUNTRY_LANG_MAP:
for lang in COUNTRY_LANG_MAP[iso2]:
if lang not in user_languages:
user_languages.append(lang)
else:
print(f"⚠️ No ISO2 mapping for '{c}')")
except Exception as e:
print(f"⚠️ Failed to fetch countries/langs for user: {e}")
return user_countries, user_languages