| | |
| | import os |
| | import sys |
| | from datetime import datetime, date |
| | import logging |
| | import requests |
| | from bs4 import BeautifulSoup |
| | from apscheduler.schedulers.background import BackgroundScheduler |
| |
|
| | from langchain_community.vectorstores import FAISS |
| | from langchain_community.embeddings import SentenceTransformerEmbeddings |
| | from langchain_community.docstore.document import Document |
| | from langchain_text_splitters import RecursiveCharacterTextSplitter |
| |
|
| | from app.utils import config |
| |
|
| | BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| | if BASE_DIR not in sys.path: |
| | sys.path.insert(0, BASE_DIR) |
| |
|
| | logging.basicConfig( |
| | format="%(asctime)s [%(levelname)s] %(message)s", |
| | level=logging.INFO |
| | ) |
| |
|
| | session = requests.Session() |
| |
|
| | def fetch_weather_now(): |
| | """Fetch current weather for all configured states.""" |
| | docs = [] |
| | for state in config.STATES: |
| | try: |
| | url = "http://api.weatherapi.com/v1/current.json" |
| | params = { |
| | "key": config.WEATHER_API_KEY, |
| | "q": f"{state}, Nigeria", |
| | "aqi": "no" |
| | } |
| | res = session.get(url, params=params, timeout=10) |
| | res.raise_for_status() |
| | data = res.json() |
| |
|
| | if "current" in data: |
| | condition = data['current']['condition']['text'] |
| | temp_c = data['current']['temp_c'] |
| | humidity = data['current']['humidity'] |
| | text = ( |
| | f"Weather in {state}: {condition}, " |
| | f"Temperature: {temp_c}°C, Humidity: {humidity}%" |
| | ) |
| | docs.append(Document( |
| | page_content=text, |
| | metadata={ |
| | "source": "WeatherAPI", |
| | "location": state, |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| | )) |
| | except Exception as e: |
| | logging.error(f"Weather fetch failed for {state}: {e}") |
| | return docs |
| |
|
| | def fetch_harvestplus_articles(): |
| | """Fetch ALL today's articles from HarvestPlus site.""" |
| | try: |
| | res = session.get(config.DATA_SOURCES["harvestplus"], timeout=10) |
| | res.raise_for_status() |
| | soup = BeautifulSoup(res.text, "html.parser") |
| | articles = soup.find_all("article") |
| |
|
| | docs = [] |
| | today_str = date.today().strftime("%Y-%m-%d") |
| |
|
| | for a in articles: |
| | content = a.get_text(strip=True) |
| | if content and len(content) > 100: |
| | |
| | if today_str in a.text or True: |
| | docs.append(Document( |
| | page_content=content, |
| | metadata={ |
| | "source": "HarvestPlus", |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| | )) |
| | return docs |
| | except Exception as e: |
| | logging.error(f"HarvestPlus fetch failed: {e}") |
| | return [] |
| |
|
| | def build_rag_vectorstore(reset=False): |
| | job_type = "FULL REBUILD" if reset else "INCREMENTAL UPDATE" |
| | logging.info(f"RAG update started — {job_type}") |
| |
|
| | all_docs = fetch_weather_now() + fetch_harvestplus_articles() |
| |
|
| | logging.info(f"Weather docs fetched: {len([d for d in all_docs if d.metadata['source'] == 'WeatherAPI'])}") |
| | logging.info(f"News docs fetched: {len([d for d in all_docs if d.metadata['source'] == 'HarvestPlus'])}") |
| |
|
| | if not all_docs: |
| | logging.warning("No documents fetched, skipping update") |
| | return |
| |
|
| | splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=64) |
| | chunks = splitter.split_documents(all_docs) |
| |
|
| | embedder = SentenceTransformerEmbeddings(model_name=config.EMBEDDING_MODEL) |
| |
|
| | vectorstore_path = config.LIVE_VS_PATH |
| |
|
| | if reset and os.path.exists(vectorstore_path): |
| | for file in os.listdir(vectorstore_path): |
| | file_path = os.path.join(vectorstore_path, file) |
| | try: |
| | os.remove(file_path) |
| | logging.info(f"Deleted old file: {file_path}") |
| | except Exception as e: |
| | logging.error(f"Failed to delete {file_path}: {e}") |
| |
|
| | if os.path.exists(vectorstore_path) and not reset: |
| | vs = FAISS.load_local( |
| | vectorstore_path, |
| | embedder, |
| | allow_dangerous_deserialization=True |
| | ) |
| | vs.add_documents(chunks) |
| | else: |
| | vs = FAISS.from_documents(chunks, embedder) |
| |
|
| | os.makedirs(vectorstore_path, exist_ok=True) |
| | vs.save_local(vectorstore_path) |
| |
|
| | logging.info(f"Vectorstore updated at {vectorstore_path}") |
| |
|
| | def schedule_updates(): |
| | scheduler = BackgroundScheduler() |
| | scheduler.add_job(build_rag_vectorstore, 'interval', hours=12, kwargs={"reset": False}) |
| | scheduler.add_job(build_rag_vectorstore, 'interval', days=7, kwargs={"reset": True}) |
| | scheduler.start() |
| | logging.info("Scheduler started — 12-hour incremental updates + weekly full rebuild") |
| | return scheduler |
| |
|