Spaces:
Configuration error
Configuration error
| # farmlingua_backend/app/tasks/rag_updater.py | |
| import os | |
| import sys | |
| from datetime import datetime, date | |
| import logging | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from apscheduler.schedulers.background import BackgroundScheduler | |
| from langchain.vectorstores import FAISS | |
| from langchain.embeddings import SentenceTransformerEmbeddings | |
| from langchain.docstore.document import Document | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from app.utils import config | |
| BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| if BASE_DIR not in sys.path: | |
| sys.path.insert(0, BASE_DIR) | |
| logging.basicConfig( | |
| format="%(asctime)s [%(levelname)s] %(message)s", | |
| level=logging.INFO | |
| ) | |
| session = requests.Session() | |
| def fetch_weather_now(): | |
| """Fetch current weather for all configured states.""" | |
| docs = [] | |
| for state in config.STATES: | |
| try: | |
| url = "http://api.weatherapi.com/v1/current.json" | |
| params = { | |
| "key": config.WEATHER_API_KEY, | |
| "q": f"{state}, Nigeria", | |
| "aqi": "no" | |
| } | |
| res = session.get(url, params=params, timeout=10) | |
| res.raise_for_status() | |
| data = res.json() | |
| if "current" in data: | |
| condition = data['current']['condition']['text'] | |
| temp_c = data['current']['temp_c'] | |
| humidity = data['current']['humidity'] | |
| text = ( | |
| f"Weather in {state}: {condition}, " | |
| f"Temperature: {temp_c}°C, Humidity: {humidity}%" | |
| ) | |
| docs.append(Document( | |
| page_content=text, | |
| metadata={ | |
| "source": "WeatherAPI", | |
| "location": state, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| )) | |
| except Exception as e: | |
| logging.error(f"Weather fetch failed for {state}: {e}") | |
| return docs | |
| def fetch_harvestplus_articles(): | |
| """Fetch ALL today's articles from HarvestPlus site.""" | |
| try: | |
| res = session.get(config.DATA_SOURCES["harvestplus"], timeout=10) | |
| res.raise_for_status() | |
| soup = BeautifulSoup(res.text, "html.parser") | |
| articles = soup.find_all("article") | |
| docs = [] | |
| today_str = date.today().strftime("%Y-%m-%d") | |
| for a in articles: | |
| content = a.get_text(strip=True) | |
| if content and len(content) > 100: | |
| if today_str in a.text or True: | |
| docs.append(Document( | |
| page_content=content, | |
| metadata={ | |
| "source": "HarvestPlus", | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| )) | |
| return docs | |
| except Exception as e: | |
| logging.error(f"HarvestPlus fetch failed: {e}") | |
| return [] | |
| def build_rag_vectorstore(reset=False): | |
| job_type = "FULL REBUILD" if reset else "INCREMENTAL UPDATE" | |
| logging.info(f"RAG update started — {job_type}") | |
| all_docs = fetch_weather_now() + fetch_harvestplus_articles() | |
| logging.info(f"Weather docs fetched: {len([d for d in all_docs if d.metadata['source'] == 'WeatherAPI'])}") | |
| logging.info(f"News docs fetched: {len([d for d in all_docs if d.metadata['source'] == 'HarvestPlus'])}") | |
| if not all_docs: | |
| logging.warning("No documents fetched, skipping update") | |
| return | |
| splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=64) | |
| chunks = splitter.split_documents(all_docs) | |
| embedder = SentenceTransformerEmbeddings(model_name=config.EMBEDDING_MODEL) | |
| vectorstore_path = config.LIVE_VS_PATH | |
| if reset and os.path.exists(vectorstore_path): | |
| for file in os.listdir(vectorstore_path): | |
| file_path = os.path.join(vectorstore_path, file) | |
| try: | |
| os.remove(file_path) | |
| logging.info(f"Deleted old file: {file_path}") | |
| except Exception as e: | |
| logging.error(f"Failed to delete {file_path}: {e}") | |
| if os.path.exists(vectorstore_path) and not reset: | |
| vs = FAISS.load_local( | |
| vectorstore_path, | |
| embedder, | |
| allow_dangerous_deserialization=True | |
| ) | |
| vs.add_documents(chunks) | |
| else: | |
| vs = FAISS.from_documents(chunks, embedder) | |
| os.makedirs(vectorstore_path, exist_ok=True) | |
| vs.save_local(vectorstore_path) | |
| logging.info(f"Vectorstore updated at {vectorstore_path}") | |
| def schedule_updates(): | |
| scheduler = BackgroundScheduler() | |
| scheduler.add_job(build_rag_vectorstore, 'interval', hours=12, kwargs={"reset": False}) | |
| scheduler.add_job(build_rag_vectorstore, 'interval', days=7, kwargs={"reset": True}) | |
| scheduler.start() | |
| logging.info("Scheduler started — 12-hour incremental updates + weekly full rebuild") | |
| return scheduler | |