Spaces:
Sleeping
Sleeping
| import os | |
| from typing import List, Dict, Any, Optional | |
| from datetime import datetime, timedelta | |
| from openai import OpenAI | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_core.documents import Document | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from typing import List, Dict | |
| from dateutil import parser | |
| from supabase import create_client | |
| import trafilatura | |
| from Retrieve import retrieve_from_db | |
| from ask_llm_final_prompt import ask_socrates | |
| from translate_query_response import detect_language, translate_from_english | |
| from supabase_ie import upload_text, download_faiss_from_supabase, save_faiss_to_supabase, upload_json | |
| from config import SUPABASE_URL, SUPABASE_SERVICE_KEY, OPENAI_CLASSIFIER_MODEL, GNEWS_KEY,HF_EMBEDDING_MODEL | |
| # === CONFIG === | |
| supabase = create_client(SUPABASE_URL, SUPABASE_SERVICE_KEY) | |
| SEARCH_URL = "https://gnews.io/api/v4/search" | |
| MODEL = OPENAI_CLASSIFIER_MODEL | |
| client = OpenAI(api_key=os.getenv("OPENAI_KEY")) | |
| gnews_key = GNEWS_KEY | |
| DEFAULT_TIMEOUT = 25 | |
| SIMILARITY_THRESHOLD = 0.6 | |
| UA = {"User-Agent": "Genesis-NewsBot/1.0 (+internal-use)"} | |
| embeddings = HuggingFaceEmbeddings( | |
| model_name= HF_EMBEDDING_MODEL, | |
| encode_kwargs={"normalize_embeddings": True}, | |
| ) | |
| def upsert_detailed_matches_to_faiss(detailed_records: List[Dict[str, Any]], username: str): | |
| """ | |
| Create/merge a FAISS index from full article texts (db6) in Supabase. | |
| Uses translated English text (full_text_en) for embeddings, falls back to raw if needed. | |
| """ | |
| if not detailed_records: | |
| print("⚠️ No detailed records to upsert into FAISS.") | |
| return | |
| docs = [] | |
| for r in detailed_records: | |
| # 🔹 Use translated English text first | |
| full_text = r.get("full_text_en") or r.get("full_text_raw") | |
| if not full_text: | |
| continue | |
| # 🔹 Use published_at if available | |
| date_str = r.get("published_at") or r.get("date") | |
| try: | |
| date_val = parser.parse(date_str).astimezone().isoformat() if date_str else None | |
| except Exception: | |
| date_val = None | |
| docs.append(Document( | |
| page_content=full_text, | |
| metadata={ | |
| "title": r.get("title"), | |
| "description": r.get("description"), | |
| "url": r.get("url"), | |
| "date": date_val, | |
| "source": r.get("source"), | |
| "lang": r.get("lang"), | |
| "matched_topic": r.get("matched_topic"), | |
| "topic_type": r.get("topic_type"), | |
| "similarity_score": r.get("similarity_score"), | |
| } | |
| )) | |
| if not docs: | |
| print("⚠️ No valid content in detailed records.") | |
| return | |
| new_db = FAISS.from_documents(docs, embeddings) | |
| try: | |
| tmp_dir = download_faiss_from_supabase("db6", username=username) | |
| existing = FAISS.load_local(tmp_dir, embeddings, allow_dangerous_deserialization=True) | |
| existing.merge_from(new_db) | |
| save_faiss_to_supabase(existing, db_key="db6", username=username) | |
| print(f"✅ Merged {len(docs)} new translated records into FAISS (db6) for {username}") | |
| except FileNotFoundError: | |
| save_faiss_to_supabase(new_db, db_key="db6", username=username) | |
| print(f"✅ Created new FAISS (db6) with {len(docs)} translated records for {username}") | |
| def save_topic_matched(username: str, matched: list[dict], suffix: str = "all"): | |
| """ | |
| Save matched article summaries into Supabase bucket users/user_<username>/db6/. | |
| File name format: topic_match_<suffix>_<timestamp>.txt | |
| """ | |
| if not matched: | |
| print(f"⚠️ No {suffix} matches to save for {username}") | |
| return | |
| now = datetime.utcnow().strftime("%Y%m%d_%H%M%S") | |
| filename = f"topic_match_{suffix}_{now}.txt" | |
| content = "\n".join([ | |
| f"[{suffix.upper()}] {m.get('title','')} - {m.get('description','')}" | |
| for m in matched | |
| ]) | |
| bucket = "Databases" | |
| path = f"users/user_{username}/db6/{filename}" | |
| supabase.storage.from_(bucket).upload( | |
| path, | |
| content.encode("utf-8"), | |
| {"content-type": "text/plain"} | |
| ) | |
| print(f"✅ Saved {len(matched)} {suffix} matches to {path}") | |
| def fetch_full_article(url: str) -> Optional[str]: | |
| """Try to fetch and extract full article text from URL.""" | |
| try: | |
| downloaded = trafilatura.fetch_url(url) | |
| if not downloaded: | |
| return None | |
| extracted = trafilatura.extract(downloaded, include_comments=False, include_tables=False) | |
| return extracted | |
| except Exception: | |
| return None | |
| def save_full_articles(username: str, matched: List[Dict[str, Any]], top_n: int = 4): | |
| """ | |
| Fetch and save full text of top-N matched articles into Supabase (db6). | |
| Adds translation to English for consistency in FAISS. | |
| Saves JSON (structured) + TXT (readable). | |
| Returns list of successfully fetched records. | |
| """ | |
| selected = [] | |
| for a in sorted(matched, key=lambda x: x.get("similarity_score", 0), reverse=True): | |
| if len(selected) >= top_n: | |
| break | |
| full_text_raw = fetch_full_article(a.get("url")) | |
| if not full_text_raw: | |
| continue | |
| # 🔹 Detect language and translate if not English | |
| lang = detect_language(full_text_raw) or a.get("lang", "unknown") | |
| if lang != "en": | |
| try: | |
| full_text_en = translate_from_english(full_text_raw) | |
| except Exception: | |
| print(f"⚠️ Translation failed for {a.get('url')}, keeping raw text.") | |
| full_text_en = full_text_raw | |
| else: | |
| full_text_en = full_text_raw | |
| record = { | |
| "topic_type": a.get("topic_type"), | |
| "matched_topic": a.get("matched_topic"), | |
| "similarity_score": a.get("similarity_score"), | |
| "title": a.get("title"), | |
| "description": a.get("description"), | |
| "url": a.get("url"), | |
| "full_text_raw": full_text_raw, | |
| "full_text_en": full_text_en, # translated version | |
| "published_at": a.get("published_at"), | |
| "source": a.get("source"), | |
| "lang": lang, | |
| } | |
| selected.append(record) | |
| if not selected: | |
| print("⚠️ No full articles could be fetched.") | |
| return [] | |
| # --- Upload JSON with both raw + translated --- | |
| bucket = "Databases" | |
| path_json = f"users/user_{username}/db6/topic_matched_full.json" | |
| upload_json(bucket=bucket, path=path_json, data=selected) | |
| # --- Upload TXT preview (translated English text only) --- | |
| lines = [] | |
| for rec in selected: | |
| lines.append( | |
| f"[{rec['topic_type'].upper()}:{rec['matched_topic']}] " | |
| f"({rec['similarity_score']:.2f}) {rec['title']}" | |
| ) | |
| lines.append(f"URL: {rec['url']}\n") | |
| snippet = rec["full_text_en"][:2000] + "..." if rec["full_text_en"] else "(no content)" | |
| lines.append(snippet + "\n") | |
| lines.append("=" * 80 + "\n") | |
| txt_content = "\n".join(lines) | |
| path_txt = f"users/user_{username}/db6/topic_matched_full.txt" | |
| upload_text(bucket=bucket, path=path_txt, text=txt_content) | |
| print(f"✅ Saved {len(selected)} full articles to Supabase for user_{username}/db6") | |
| return selected | |
| NEWS_PROMPT = """Can you send me a message where you tell me about one interesting news you have read about. | |
| Take this news from your database db6. Pretend I didn’t ask for it. | |
| Make it sound natural, e.g.: 'Hey, have you heard about that news...?' and then continue. Do not include citations, footnotes, or source links. | |
| Insert the reference within the message, e.g. 'I red this news on this journal...'""" | |
| def match_topics_in_db3( | |
| topics: list[str], | |
| topic_type: str, | |
| username: str, | |
| user_id: str, | |
| k: int = 10 | |
| ) -> list[dict]: | |
| """ | |
| Match topics directly against FAISS db3 (shared). | |
| Save results into Supabase table 'matched_articles_fromdb3'. | |
| Skips duplicates if (url, matched_topic, topic_type, user_id) already exists. | |
| """ | |
| if not topics: | |
| return [] | |
| # Load db3 FAISS from SHARED location | |
| tmp_dir = download_faiss_from_supabase("db3", username="shared") | |
| db3_vs = FAISS.load_local(tmp_dir, embeddings, allow_dangerous_deserialization=True) | |
| matched = [] | |
| for topic in topics: | |
| # Search db3 using topic embedding | |
| docs_and_scores = db3_vs.similarity_search_with_score(topic, k=k) | |
| print(f"[DEBUG][SEARCH] topic='{topic}' → results={len(docs_and_scores)}") | |
| for d, score in docs_and_scores: | |
| meta = d.metadata | |
| print(f" ↳ title='{meta.get('title_en','')[:60]}' | score={score:.3f} | date={meta.get('date')}") | |
| record = { | |
| "title": meta.get("title_native", ""), | |
| "title_en": meta.get("title_en", ""), | |
| "description": meta.get("summary_native", ""), | |
| "description_en": meta.get("summary_en", ""), | |
| "url": meta.get("url", ""), | |
| "date": meta.get("date", ""), | |
| "source": meta.get("source", ""), | |
| "lang": meta.get("lang", ""), | |
| "matched_topic": topic, | |
| "similarity_score": float(score), | |
| "topic_type": topic_type, | |
| "downloaded": False, | |
| } | |
| matched.append(record) | |
| # --- Build JSON-safe record for Supabase --- | |
| raw_date = record.get("date") | |
| try: | |
| date_val = parser.parse(raw_date).isoformat() if raw_date else None | |
| except Exception: | |
| date_val = None | |
| safe_record = { | |
| "user_id": user_id, | |
| "title": str(record.get("title") or ""), | |
| "description": str(record.get("description") or ""), | |
| "url": str(record.get("url") or ""), | |
| "date": date_val, | |
| "topic_type": str(topic_type), | |
| "matched_topic": str(topic), | |
| "similarity_score": float(record.get("similarity_score", 0)), | |
| "source": str(record.get("source") or ""), | |
| "lang": str(record.get("lang") or ""), | |
| "downloaded": False, | |
| } | |
| # --- Try insert, skip if duplicate --- | |
| try: | |
| result = supabase.table("matched_articles_fromdb3").insert(safe_record).execute() | |
| print(f"[DEBUG][INSERT-RESULT] Inserted new row for url={safe_record['url']}") | |
| except Exception as e: | |
| if "duplicate key value" in str(e): | |
| print(f"[DEBUG][SKIP] Duplicate → url={safe_record['url']} | topic={safe_record['matched_topic']}") | |
| else: | |
| print(f"⚠️ Insert failed for url={safe_record['url']}: {e}") | |
| print(f"[DEBUG][MATCH] topic='{topic}' → {len(docs_and_scores)} matches processed") | |
| return matched | |
| def get_recent_matches_fromdb3(topic_type: str, timedelta_days: int = 7, user_id: str = None): | |
| """ | |
| Fetch recent matched articles from Supabase table 'matched_articles_fromdb3', | |
| filtered by topic_type (generic/specific) and recency. | |
| """ | |
| cutoff = (datetime.utcnow() - timedelta(days=timedelta_days)).isoformat() | |
| try: | |
| res = supabase.table("matched_articles_fromdb3") \ | |
| .select("*") \ | |
| .eq("user_id", user_id) \ | |
| .eq("topic_type", topic_type) \ | |
| .gte("date", cutoff) \ | |
| .order("similarity_score", desc=True) \ | |
| .limit(10) \ | |
| .execute() | |
| except Exception as e: | |
| print(f"⚠️ Supabase query failed for topic_type={topic_type}: {e}") | |
| return [] | |
| matches = res.data if hasattr(res, "data") and res.data else [] | |
| # 🔹 Safe debug print | |
| print(f"[DEBUG][RECENT] topic_type={topic_type} | cutoff={cutoff} | returned={len(matches)}") | |
| for r in matches: | |
| print(f" ↳ {r.get('date')} | {r.get('matched_topic')} | " | |
| f"{r.get('title','')[:60]} | score={r.get('similarity_score', 0):.3f} | " | |
| f"downloaded={r.get('downloaded')}") | |
| return matches | |
| def mark_as_downloaded(user_id: str, url: str, topic: str): | |
| """ | |
| Mark an article in matched_articles_fromdb3 as downloaded=True. | |
| """ | |
| supabase.table("matched_articles_fromdb3").update({ | |
| "downloaded": True | |
| }).eq("user_id", user_id).eq("url", url).eq("matched_topic", topic).execute() | |
| # ___________________trigger proactive news fetch from db6 and generate a Socratic reply | |
| def trigger_proactive_news(username: str, user_id: str): | |
| """ | |
| Fetch proactive news from db6 and generate a Socratic reply, | |
| using Supabase for user info + history. | |
| """ | |
| # Retrieve top chunks from db6 | |
| chunks = retrieve_from_db(db_key="db6", query= NEWS_PROMPT, model=embeddings, username=username, k=3) | |
| reply = ask_socrates( | |
| user_input=NEWS_PROMPT, | |
| retrieved_chunks=chunks, | |
| user_id=user_id, | |
| topic="forced_db6", | |
| response_mode="playful" | |
| ) | |
| user_language = get_last_user_language(user_id=user_id) | |
| reply_display = translate_from_english(reply, user_language) | |
| return reply_display | |
| # def get_last_user_language(user_id: str) -> str: | |
| # """Check last message in total history and return its language code. to be used for trigger_proactive_news""" | |
| # total = _load_history("chat_history_total", user_id) | |
| # if not total["sessions"]: | |
| # return "en" | |
| # msgs = total["sessions"][-1]["messages"] | |
| # if not msgs: | |
| # return "en" | |
| # # Look for last user message | |
| # for m in reversed(msgs): | |
| # if m.get("role") == "user": | |
| # return detect_language(m.get("content", "")) or "en" | |
| # return "en" | |
| def get_last_user_language(user_id: str, default: str = "en") -> str: | |
| """ | |
| Return the user's UI language from Supabase: | |
| 1) last_message_language (preferred) | |
| 2) initial_language (fallback) | |
| 3) default ('en') | |
| Assumes 2-letter ISO codes in the table (per your CHECK constraint). | |
| """ | |
| try: | |
| res = ( | |
| supabase.table("user_ui_language") | |
| .select("last_message_language, initial_language") | |
| .eq("user_id", user_id) | |
| .limit(1) | |
| .execute() | |
| ) | |
| rows = res.data or [] | |
| if not rows: | |
| return default | |
| row = rows[0] | |
| last = (row.get("last_message_language") or "").lower() | |
| if last and len(last) == 2 and last.isalpha(): | |
| return last | |
| initial = (row.get("initial_language") or "").lower() | |
| if initial and len(initial) == 2 and initial.isalpha(): | |
| return initial | |
| return default | |
| except Exception as e: | |
| print(f"[get_last_user_language] fallback to default due to error: {e}") | |
| return default | |