farmligua_AI / app /tasks /rag_updater.py
drrobot9's picture
Upload folder using huggingface_hub
869543a verified
# farmlingua_backend/app/tasks/rag_updater.py
import os
import sys
from datetime import datetime, date
import logging
import requests
from bs4 import BeautifulSoup
from apscheduler.schedulers.background import BackgroundScheduler
from langchain.vectorstores import FAISS
from langchain.embeddings import SentenceTransformerEmbeddings
from langchain.docstore.document import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from app.utils import config
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if BASE_DIR not in sys.path:
sys.path.insert(0, BASE_DIR)
logging.basicConfig(
format="%(asctime)s [%(levelname)s] %(message)s",
level=logging.INFO
)
session = requests.Session()
def fetch_weather_now():
"""Fetch current weather for all configured states."""
docs = []
for state in config.STATES:
try:
url = "http://api.weatherapi.com/v1/current.json"
params = {
"key": config.WEATHER_API_KEY,
"q": f"{state}, Nigeria",
"aqi": "no"
}
res = session.get(url, params=params, timeout=10)
res.raise_for_status()
data = res.json()
if "current" in data:
condition = data['current']['condition']['text']
temp_c = data['current']['temp_c']
humidity = data['current']['humidity']
text = (
f"Weather in {state}: {condition}, "
f"Temperature: {temp_c}°C, Humidity: {humidity}%"
)
docs.append(Document(
page_content=text,
metadata={
"source": "WeatherAPI",
"location": state,
"timestamp": datetime.utcnow().isoformat()
}
))
except Exception as e:
logging.error(f"Weather fetch failed for {state}: {e}")
return docs
def fetch_harvestplus_articles():
"""Fetch ALL today's articles from HarvestPlus site."""
try:
res = session.get(config.DATA_SOURCES["harvestplus"], timeout=10)
res.raise_for_status()
soup = BeautifulSoup(res.text, "html.parser")
articles = soup.find_all("article")
docs = []
today_str = date.today().strftime("%Y-%m-%d")
for a in articles:
content = a.get_text(strip=True)
if content and len(content) > 100:
if today_str in a.text or True:
docs.append(Document(
page_content=content,
metadata={
"source": "HarvestPlus",
"timestamp": datetime.utcnow().isoformat()
}
))
return docs
except Exception as e:
logging.error(f"HarvestPlus fetch failed: {e}")
return []
def build_rag_vectorstore(reset=False):
job_type = "FULL REBUILD" if reset else "INCREMENTAL UPDATE"
logging.info(f"RAG update started — {job_type}")
all_docs = fetch_weather_now() + fetch_harvestplus_articles()
logging.info(f"Weather docs fetched: {len([d for d in all_docs if d.metadata['source'] == 'WeatherAPI'])}")
logging.info(f"News docs fetched: {len([d for d in all_docs if d.metadata['source'] == 'HarvestPlus'])}")
if not all_docs:
logging.warning("No documents fetched, skipping update")
return
splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=64)
chunks = splitter.split_documents(all_docs)
embedder = SentenceTransformerEmbeddings(model_name=config.EMBEDDING_MODEL)
vectorstore_path = config.LIVE_VS_PATH
if reset and os.path.exists(vectorstore_path):
for file in os.listdir(vectorstore_path):
file_path = os.path.join(vectorstore_path, file)
try:
os.remove(file_path)
logging.info(f"Deleted old file: {file_path}")
except Exception as e:
logging.error(f"Failed to delete {file_path}: {e}")
if os.path.exists(vectorstore_path) and not reset:
vs = FAISS.load_local(
vectorstore_path,
embedder,
allow_dangerous_deserialization=True
)
vs.add_documents(chunks)
else:
vs = FAISS.from_documents(chunks, embedder)
os.makedirs(vectorstore_path, exist_ok=True)
vs.save_local(vectorstore_path)
logging.info(f"Vectorstore updated at {vectorstore_path}")
def schedule_updates():
scheduler = BackgroundScheduler()
scheduler.add_job(build_rag_vectorstore, 'interval', hours=12, kwargs={"reset": False})
scheduler.add_job(build_rag_vectorstore, 'interval', days=7, kwargs={"reset": True})
scheduler.start()
logging.info("Scheduler started — 12-hour incremental updates + weekly full rebuild")
return scheduler